diff --git a/benchmarks/benchmark_db_utils.py b/benchmarks/benchmark_db_utils.py index edf54c18d6..ec696fb468 100644 --- a/benchmarks/benchmark_db_utils.py +++ b/benchmarks/benchmark_db_utils.py @@ -139,10 +139,10 @@ def write_run( # pylint: disable=import-outside-toplevel - from benchmark_db_writer import bq_writer_utils - from benchmark_db_writer import dataclass_bigquery_writer - from benchmark_db_writer.run_summary_writer import sample_run_summary_writer - from benchmark_db_writer.schema.workload_benchmark_v2 import workload_benchmark_v2_schema + from benchmarks.benchmark_db_writer import bq_writer_utils + from benchmarks.benchmark_db_writer import dataclass_bigquery_writer + from benchmarks.benchmark_db_writer.run_summary_writer import run_summary_writer + from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import workload_benchmark_v2_schema def get_db_client( project: str, dataset: str, table: str, dataclass_type: Type, is_test: bool = False @@ -168,9 +168,9 @@ def get_db_client( print(options.model_id) if ( - sample_run_summary_writer.validate_model_id(options.model_id, options.is_test) - and sample_run_summary_writer.validate_hardware_id(options.hardware_id, options.is_test) - and sample_run_summary_writer.validate_software_id(options.software_id, options.is_test) + run_summary_writer.validate_model_id(options.model_id, options.is_test) + and run_summary_writer.validate_hardware_id(options.hardware_id, options.is_test) + and run_summary_writer.validate_software_id(options.software_id, options.is_test) ): summary = workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema( run_id=f"run-{uuid.uuid4()}", @@ -179,6 +179,7 @@ def get_db_client( hardware_id=options.hardware_id, hardware_num_chips=number_of_chips, hardware_num_nodes=number_of_nodes, + hardware_num_slices=options.hardware_num_slices, result_success=run_success, configs_framework=framework_config_in_json, configs_env=env_variables, diff --git a/benchmarks/benchmark_db_writer/__init__.py b/benchmarks/benchmark_db_writer/__init__.py new file mode 100644 index 0000000000..e51be79ff0 --- /dev/null +++ b/benchmarks/benchmark_db_writer/__init__.py @@ -0,0 +1,3 @@ +import datetime + +__version__ = "1.0.0.dev" + datetime.datetime.now().strftime("%Y%m%d") diff --git a/benchmarks/benchmark_db_writer/bigquery_types.py b/benchmarks/benchmark_db_writer/bigquery_types.py new file mode 100644 index 0000000000..893df510a0 --- /dev/null +++ b/benchmarks/benchmark_db_writer/bigquery_types.py @@ -0,0 +1,53 @@ +import datetime +import decimal +import enum +from typing import Dict, NewType, Type + + +class BigQueryFieldModes(str, enum.Enum): + NULLABLE = "NULLABLE" + REQUIRED = "REQUIRED" + REPEATED = "REPEATED" + + +class BigQueryTypes(str, enum.Enum): + STRING = "STRING" + BYTES = "BYTES" + INTEGER = "INT64" + INT64 = "INT64" + FLOAT64 = "FLOAT64" + FLOAT = "FLOAT64" + NUMERIC = "NUMERIC" + BOOL = "BOOL" + BOOLEAN = "BOOL" + STRUCT = "STRUCT" + RECORD = "STRUCT" + TIMESTAMP = "TIMESTAMP" + DATE = "DATE" + TIME = "TIME" + DATETIME = "DATETIME" + GEOGRAPHY = "GEOGRAPHY" + JSON = "JSON" + + +Geography = NewType("Geography", str) + + +class TimeStamp(datetime.datetime): + pass + + +TypeMapping: Dict[BigQueryTypes, Type] = { + BigQueryTypes.STRING: str, + BigQueryTypes.BYTES: bytes, + BigQueryTypes.INT64: int, + BigQueryTypes.FLOAT64: float, + BigQueryTypes.NUMERIC: decimal.Decimal, + BigQueryTypes.BOOL: bool, + BigQueryTypes.TIMESTAMP: TimeStamp, + BigQueryTypes.DATE: datetime.date, + BigQueryTypes.TIME: datetime.time, + BigQueryTypes.DATETIME: datetime.datetime, + BigQueryTypes.GEOGRAPHY: Geography, + BigQueryTypes.JSON: dict, +} diff --git a/benchmarks/benchmark_db_writer/bq_info_writer/__init__.py b/benchmarks/benchmark_db_writer/bq_info_writer/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/benchmarks/benchmark_db_writer/bq_info_writer/hardware_info_writer.py b/benchmarks/benchmark_db_writer/bq_info_writer/hardware_info_writer.py new file mode 100644 index 0000000000..67b61db3ea --- /dev/null +++ b/benchmarks/benchmark_db_writer/bq_info_writer/hardware_info_writer.py @@ -0,0 +1,111 @@ +"""TODO: Update hardware info in the main function & run the script.""" + +import logging +import os + +from benchmarks.benchmark_db_writer import bq_writer_utils +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + hardware_info_schema, +) + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def write_hardware_config( + project, + dataset, + table, + dataclass_type, + hardware_id, + gcp_accelerator_name, + chip_name, + bf_16_tflops, + memory, + hardware_type, + provider_name, + chips_per_node=None, + update_person_ldap=os.getenv("USER", "mrv2"), + description="", + other="", + host_memory=None, + host_vcpus=None, +): + + writer = bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + hardware_info = writer.query(where={"hardware_id": hardware_id}) + if hardware_info: + raise ValueError("Hardware id %s is already present in the %s table" % (hardware_id, table)) + + hardware_data = hardware_info_schema.HardwareInfo( + hardware_id=hardware_id, + gcp_accelerator_name=gcp_accelerator_name, + chip_name=chip_name, + bf_16_tflops=bf_16_tflops, + memory=memory, + chips_per_node=chips_per_node, + hardware_type=hardware_type, + provider_name=provider_name, + update_person_ldap=update_person_ldap, + description=description, + other=other, + host_memory=host_memory, + host_vcpus=host_vcpus, + ) + + logging.info("Writing Data %s to %s table.", hardware_data, table) + writer.write([hardware_data]) + + +if __name__ == "__main__": + + table_configs = [ + { + "project": "ml-workload-benchmarks", + "dataset": "benchmark_dataset_v2", + "table": "hardware_info", + }, + { + "project": "supercomputer-testing", + "dataset": "mantaray_v2", + "table": "hardware_info", + }, + ] + + # Update it on every run + hardware_id = "a4" + gcp_accelerator_name = "A4" + chip_name = "B200" + bf_16_tflops = 2237 + memory = 180 + chips_per_node = 8 + hardware_type = "GPU" + provider_name = "Nvidia" + description = "" + + for table_config in table_configs: + write_hardware_config( + project=table_config["project"], + dataset=table_config["dataset"], + table=table_config["table"], + dataclass_type=hardware_info_schema.HardwareInfo, + hardware_id=hardware_id, + gcp_accelerator_name=gcp_accelerator_name, + chip_name=chip_name, + bf_16_tflops=bf_16_tflops, + memory=memory, + chips_per_node=chips_per_node, + description=description, + hardware_type=hardware_type, + provider_name=provider_name, + ) diff --git a/benchmarks/benchmark_db_writer/bq_info_writer/microbenchmark_workload_info_writer.py b/benchmarks/benchmark_db_writer/bq_info_writer/microbenchmark_workload_info_writer.py new file mode 100644 index 0000000000..88ce2320bd --- /dev/null +++ b/benchmarks/benchmark_db_writer/bq_info_writer/microbenchmark_workload_info_writer.py @@ -0,0 +1,82 @@ +"""TODO: Update hardware info in the main function & run the script.""" + +import logging +import os + +from benchmarks.benchmark_db_writer import bq_writer_utils +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + microbenchmark_workload_info_schema, +) + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def write_microbenchmark_workload_config( + project, + dataset, + table, + dataclass_type, + workload_id, + update_person_ldap=os.getenv("USER", "imo-eng"), + description="", +): + + writer = bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + microbenchmark_workload_info = writer.query(where={"workload_id": workload_id}) + if microbenchmark_workload_info: + raise ValueError("Workload id %s is already present in the %s table" % (microbenchmark_workload_info, table)) + + workload_data = microbenchmark_workload_info_schema.MicrobenchmarkWorkloadInfo( + workload_id=workload_id, + update_person_ldap=update_person_ldap, + description=description, + ) + + logging.info("Writing Data %s to %s table.", workload_data, table) + writer.write([workload_data]) + + +def insert(workload_id, description=""): + table_configs = [ + { + "project": "ml-workload-benchmarks", + "dataset": "benchmark_dataset_v2", + "table": "microbenchmark_workload_info", + }, + { + "project": "supercomputer-testing", + "dataset": "mantaray_v2", + "table": "microbenchmark_workload_info", + }, + ] + + assert workload_id is not None + + for table_config in table_configs: + write_microbenchmark_workload_config( + project=table_config["project"], + dataset=table_config["dataset"], + table=table_config["table"], + dataclass_type=microbenchmark_workload_info_schema.MicrobenchmarkWorkloadInfo, + workload_id=workload_id, + description=description, + ) + + +if __name__ == "__main__": + + # workloads = ["all_gather", "ppermute", "psum", "psum_scatter"] + workloads = [] + for workload in workloads: + insert(workload, "") diff --git a/benchmarks/benchmark_db_writer/bq_info_writer/model_info_writer.py b/benchmarks/benchmark_db_writer/bq_info_writer/model_info_writer.py new file mode 100644 index 0000000000..c9a812ea14 --- /dev/null +++ b/benchmarks/benchmark_db_writer/bq_info_writer/model_info_writer.py @@ -0,0 +1,105 @@ +"""TODO: Update model info in the main function & run the script.""" + +import logging +import os + +from benchmarks.benchmark_db_writer import bq_writer_utils +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import model_info_schema + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def write_model_config( + project, + dataset, + table, + dataclass_type, + model_id, + name, + variant, + parameter_size_in_billions, + update_person_ldap=os.getenv("USER", "mrv2"), + description="", + details="", +): + + writer = bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + model_info = writer.query(where={"model_id": model_id}) + if model_info: + raise ValueError("Model id %s is already present in the %s table" % (model_id, table)) + + # Check if there is already a model info based on name, + # variant and parameter size + model_info = writer.query( + where={ + "name": name, + "variant": variant, + "parameter_size_in_billions": parameter_size_in_billions, + } + ) + if model_info: + raise ValueError( + "Model with name %s, variant %s and " + "parameter size %s is already present in the %s " + "table" % (name, variant, parameter_size_in_billions, table) + ) + + model_data = model_info_schema.ModelInfo( + model_id=model_id, + name=name, + variant=variant, + parameter_size_in_billions=parameter_size_in_billions, + update_person_ldap=update_person_ldap, + description=description, + details=details, + ) + + logging.info("Writing Data %s to %s table.", model_data, table) + writer.write([model_data]) + + +if __name__ == "__main__": + + table_configs = [ + { + "project": "ml-workload-benchmarks", + "dataset": "benchmark_dataset_v2", + "table": "model_info", + }, + { + "project": "supercomputer-testing", + "dataset": "mantaray_v2", + "table": "model_info", + }, + ] + + # Update it on every run + model_id = "mistral-7b" + name = "Mistral" + variant = "7B" + parameter_size_in_billions = 7 + description = "https://huggingface.co/mistralai/Mistral-7B-v0.3" + + for table_config in table_configs: + write_model_config( + project=table_config["project"], + dataset=table_config["dataset"], + table=table_config["table"], + model_id=model_id, + dataclass_type=model_info_schema.ModelInfo, + name=name, + variant=variant, + parameter_size_in_billions=parameter_size_in_billions, + description=description, + ) diff --git a/benchmarks/benchmark_db_writer/bq_info_writer/software_info_writer.py b/benchmarks/benchmark_db_writer/bq_info_writer/software_info_writer.py new file mode 100644 index 0000000000..4afef7b94f --- /dev/null +++ b/benchmarks/benchmark_db_writer/bq_info_writer/software_info_writer.py @@ -0,0 +1,83 @@ +"""TODO: Update software info in the main function & run the script.""" + +import logging +import os + +from benchmarks.benchmark_db_writer import bq_writer_utils +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + software_info_schema, +) + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def write_software_config( + project, + dataset, + table, + dataclass_type, + software_id, + ml_framework, + os, + compiler, + training_framework, + update_person_ldap=os.getenv("USER", "mrv2"), + description="", +): + + writer = bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + software_info = writer.query(where={"software_id": software_id}) + if software_info: + raise ValueError("Software id %s is already present in the %s table" % (software_id, table)) + + software_data = software_info_schema.SoftwareInfo( + software_id=software_id, + ml_framework=ml_framework, + os=os, + compiler=compiler, + training_framework=training_framework, + update_person_ldap=update_person_ldap, + description=description, + ) + + logging.info("Writing Data %s to %s table.", software_data, table) + writer.write([software_data]) + + +if __name__ == "__main__": + + project = "ml-workload-benchmarks" + dataset = "benchmark_dataset_v2" + table = "software_info" + + # Update it on every run + software_id = "jax_maxtext" + ml_framework = "JAX" + os = "cos" + compiler = "XLA" + training_framework = "MaxText" + description = "https://github.com/AI-Hypercomputer/maxtext" + + write_software_config( + project=project, + dataset=dataset, + table=table, + dataclass_type=software_info_schema.SoftwareInfo, + software_id=software_id, + ml_framework=ml_framework, + os=os, + compiler=compiler, + training_framework=training_framework, + description=description, + ) diff --git a/benchmarks/benchmark_db_writer/bq_info_writer/storage_info_writer.py b/benchmarks/benchmark_db_writer/bq_info_writer/storage_info_writer.py new file mode 100644 index 0000000000..7a7bb11205 --- /dev/null +++ b/benchmarks/benchmark_db_writer/bq_info_writer/storage_info_writer.py @@ -0,0 +1,67 @@ +"""Update the storage_info table of the benchmark dataset.""" + +import logging +from typing import Sequence + +from absl import app +from absl import flags +from benchmarks.benchmark_db_writer import bq_writer_utils +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import storage_info_schema + +_STORAGE_PRODUCT = flags.DEFINE_string("storage_product", "", "Type of the storage product.") +_CONFIG = flags.DEFINE_string("config", "", "The configs of the storage system.") +_DESCRIPTION = flags.DEFINE_string("description", "", "The description of the storage system.") +_IS_TEST = flags.DEFINE_bool("is_test", False, "True to write the storage info to the test project.") + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def write_storage_config( + project, + dataset, + table, + dataclass_type, + storage_product, + config, + description, +): + + writer = bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + storage_data = storage_info_schema.StorageInfo( + storage_id=None, + storage_product=storage_product, + config=config, + description=description, + update_person_ldap=None, + update_timestamp=None, + ) + + logging.info("Writing Data %s to %s table.", storage_data, table) + writer.write([storage_data]) + + +def main(_: Sequence[str]): + write_storage_config( + project=("supercomputer-testing" if _IS_TEST.value else "ml-workload-benchmarks"), + dataset="mantaray_v2" if _IS_TEST.value else "benchmark_dataset_v2", + table="storage_info", + dataclass_type=storage_info_schema.StorageInfo, + storage_product=_STORAGE_PRODUCT.value, + config=_CONFIG.value, + description=_DESCRIPTION.value, + ) + + +if __name__ == "__main__": + app.run(main) diff --git a/benchmarks/benchmark_db_writer/bq_writer_utils.py b/benchmarks/benchmark_db_writer/bq_writer_utils.py new file mode 100644 index 0000000000..d56cce1754 --- /dev/null +++ b/benchmarks/benchmark_db_writer/bq_writer_utils.py @@ -0,0 +1,76 @@ +import logging +from typing import Type +from benchmarks.benchmark_db_writer import dataclass_bigquery_writer + + +def create_bq_writer_object(project, dataset, table, dataclass_type): + """Creates a BQ writer config and uses it to create BQ writer object.""" + + config = dataclass_bigquery_writer.BigqueryWriterConfig(project, dataset, table) + + writer = dataclass_bigquery_writer.DataclassBigQueryWriter(dataclass_type, config) + + return writer + + +def get_db_client(table: str, dataclass_type: Type, is_test: bool = False) -> create_bq_writer_object: + """Creates a BigQuery client object. + + Args: + table: The name of the BigQuery table. + dataclass_type: The dataclass type corresponding to the table schema. + is_test: Whether to use the testing project or the production project. + + Returns: + A BigQuery client object. + """ + + project = "cloud-tpu-multipod-dev" if is_test else "ml-workload-benchmarks" + dataset = "benchmark_dataset" if is_test else "benchmark_dataset_v2" + return create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + +def validate_id( + logger: logging.Logger, + id_value: str, + table_name: str, + id_field: str, + dataclass_type: Type, + is_test: bool = False, +) -> bool: + """Generic function to validate an ID against a BigQuery table. + + Args: + logger: The logging instance which represents a single logging channel. + id_value: The ID value to validate. + table_name: The name of the BigQuery table. + id_field: The name of the ID field in the table. + dataclass_type: The dataclass type corresponding to the table schema. + is_test: Whether to use the testing project or the production project. + + Returns: + True if the ID is valid, False otherwise. + """ + + client = get_db_client(table_name, dataclass_type, is_test) + result = client.query(where={id_field: id_value}) + + if not result: + logger.info( + "%s: %s is not present in the %s table ", + id_field.capitalize(), + id_value, + table_name, + ) + logger.info( + "Please add %s specific row in %s table before adding the new rows to" " the target table", + id_value, + table_name, + ) + return False + return True diff --git a/benchmarks/benchmark_db_writer/dataclass_bigquery_writer.py b/benchmarks/benchmark_db_writer/dataclass_bigquery_writer.py new file mode 100644 index 0000000000..4ce7865b26 --- /dev/null +++ b/benchmarks/benchmark_db_writer/dataclass_bigquery_writer.py @@ -0,0 +1,266 @@ +import copy +import dataclasses +import logging +import pprint +import time +from typing import Any, Generic, List, Optional, Sequence, Type, TypeVar + +from benchmarks.benchmark_db_writer import bigquery_types +from benchmarks.benchmark_db_writer import dataclass_converter_utils +from benchmarks.benchmark_db_writer import row_transformer_utils +import google.api_core.exceptions +from google.cloud import bigquery + +# The type of the generic dataclass +T = TypeVar("T") + +logger = logging.getLogger(__name__) + + +def _field_type_str(field_type: str): + """Normalizes the field type to a string. + + Args: + field_type: the field type to convert to a string. + + Returns: + The string representation of the field type. + """ + if isinstance(field_type, bigquery_types.BigQueryTypes): + return field_type.value + else: + return bigquery_types.BigQueryTypes[field_type].value + + +def _field_to_dict(field: bigquery.schema.SchemaField): + """A concise dict representation of a SchemaField. + + This is only to compare schemas to check if the schema fields have changed. + + Args: + field: the schema field to convert to a dict + + Returns: + A dict representation of the schema field. + """ + return { + "field_type": _field_type_str(field.field_type), + "mode": field.mode, + "fields": schema_to_dict(field.fields), + } + + +def schema_to_dict(schema: Sequence[bigquery.schema.SchemaField]): + """A concise dict representation of a bigquery schema. + + This is used to compare the current schema against the dataclass generated + schema. + + Args: + schema: the schema to convert to a dict. + + Returns: + A dict representation of the schema field. + """ + return {field.name: _field_to_dict(field) for field in schema} + + +def _recursive_struct_param( + name: str, schema: dict[str, Any], values: Optional[dict[str, Any]] = None +) -> bigquery.StructQueryParameter: + params = [] + # match up schema to values + for field_name, field_schema in schema.items(): + value = values[field_name] if values else None + param = _query_param(field_name, field_schema, value) + assert param + params.append(param) + return bigquery.StructQueryParameter(name, *params) + + +def _query_param(name: str, schema_field: dict[str, Any], value: Any): # -> bigquery._AbstractQueryParameter: + if schema_field["field_type"] == "STRUCT": + assert value is None or isinstance(value, dict) + # recurse the schema even for None/NULL values which we have to propagate + # all the way through the struct + return _recursive_struct_param(name, schema=schema_field["fields"], values=value) + else: + return bigquery.ScalarQueryParameter(name, schema_field["field_type"], value) + + +@dataclasses.dataclass +class BigqueryWriterConfig: + project: str + dataset: str + table: str + + +class DataclassBigQueryWriter(Generic[T]): + """Uses the `bq-schema` package to write a dataclass to a BigQuery table.""" + + def __init__(self, dataclass_type: Type[T], config: BigqueryWriterConfig): + """Initializes the writer. + + Args: + dataclass_type: the dataclass type to use as the schema + project: the GCP project to write to + dataset: the dataset to write to + table: the table to write to + """ + self.client = bigquery.Client(project=config.project) + self.row_transformer = None + self.table_id = f"{config.project}.{config.dataset}.{config.table}" + self.dataclass_type = dataclass_type + + self.input_data_schema = dataclass_converter_utils.dataclass_to_schema(self.dataclass_type) + # Get or create table + try: + self.table = self.client.get_table(self.table_id) + except google.api_core.exceptions.NotFound: + logger.warning("Table %s not found, creating it", self.table_id) + self.client.create_table(self.table_id) + self.table = self.client.get_table(self.table_id) + # When creating the table for the first time, always update schema. + self.update_schema() + + # Check schema of table and input dataclass + self.check_schema() + + def check_schema(self): + table_schema = schema_to_dict(self.table.schema) + data_schema = schema_to_dict(self.input_data_schema) + + # Check whether dataclass has any additional column + for dataclass_column in data_schema.keys(): + if dataclass_column not in table_schema: + raise ValueError( + f"Schema of table {self.table_id} is different than input data." + " Please check both schema and re-run.\n" + f"Column: {dataclass_column} is absent in table whereas it's " + "present in dataclass." + ) + + # Check whether big query table has any additional column which are not "nullable" + for table_column, column_attributes in table_schema.items(): + if table_column not in data_schema and column_attributes["mode"] != bigquery_types.BigQueryFieldModes.NULLABLE: + + raise ValueError( + f"Schema of table {self.table_id} is different than input data." + " Please check both schema and re-run.\n" + f"Column: {table_column} is absent in dataclass whereas it's " + "present in table & is of Required type." + ) + + def update_schema(self): + """When new table is created, this function gets called to update the schema.""" + logger.info( + "DataclassBigQueryWriter: updating schema to %s", + pprint.pformat(self.input_data_schema), + ) + old_schema = copy.deepcopy(self.table.schema) + try: + self.table.schema = self.input_data_schema + self.table = self.client.update_table(self.table, ["schema"]) + logger.info("BigQueryResultWriter: waiting for some time for the schema to" " propagate") + time.sleep(60) + except Exception as e: + logger.exception("Failed to update bigquery schema with error %s", e) + self.table.schema = old_schema + + def transform(self, dataclass: T) -> dict: + return row_transformer_utils.RowTransformer.dataclass_instance_to_bq_row(dataclass) + + def read(self, where: Optional[str] = None) -> tuple[list[T], list[T]]: + """Reads the bigquery table using `where` as the WHERE clause. + + Args: + where: used as the `WHERE` expression when querying the database. + + Returns: + The list of bigquery entries as the dataclass T. + """ + row_transformer = row_transformer_utils.RowTransformer[T](self.dataclass_type) + query = "SELECT * FROM " + self.table_id + if where: + query += " WHERE " + where + raw_rows = [] + rows = [] + for bq_row in self.client.query(query=query): + raw_rows.append(bq_row) + dataclass = row_transformer.bq_row_to_dataclass_instance(bq_row) + assert isinstance(dataclass, self.dataclass_type) + rows.append(dataclass) + return rows, raw_rows + + def _get_field_schema_dict(self, field_name): + schema_dict = {"fields": schema_to_dict(self.input_data_schema)} + + field_dir = field_name.split(".") + for key in field_dir: + schema_dict = schema_dict["fields"][key] + return schema_dict + + def _get_query_for_value(self, field_name, value): # -> Tuple[str, bigquery._AbstractQueryParameter]: + if dataclasses.is_dataclass(value): + value = row_transformer_utils.RowTransformer.dataclass_instance_to_bq_row(value) + # # find schema for `field_name`: + field_schema = self._get_field_schema_dict(field_name) + at_name = "_".join(field_name.split(".")) + return f"{field_name} = @{at_name}", _query_param(at_name, field_schema, value) + + def query_column(self, column_name) -> List[Any]: + """Returns all values of the given column name.""" + + query_str = f"SELECT {column_name} FROM {self.table_id}" + query_result = self.client.query(query=query_str) + + return [row[0] for row in query_result] + + def query(self, where: dict[str, Any] = {}) -> list[T]: + """Reads the bigquery table using `where` dict as the WHERE clause. + + Args: + where: A dict with key value pair using which WHERE clause is constructed. + + Returns: + The list of bigquery entries as the dataclass T. + """ + where_exprs = [] + params = [] + for field_name, value in where.items(): + where_expr, param = self._get_query_for_value(field_name, value) + params.append(param) + where_exprs.append(where_expr) + query_str = f"SELECT * FROM {self.table_id}" + if where_exprs: + where_stmt = " AND ".join(where_exprs) + query_str += f" WHERE {where_stmt}" + job_config = bigquery.QueryJobConfig(query_parameters=params) + + row_transformer = row_transformer_utils.RowTransformer[T](self.dataclass_type) + rows = [] + for bq_row in self.client.query(query=query_str, job_config=job_config): + dataclass = row_transformer.bq_row_to_dataclass_instance(bq_row) + assert isinstance(dataclass, self.dataclass_type) + rows.append(dataclass) + return rows + + def write(self, rows: List[T]): + """Bulk write to big query. + + Args: + rows: list of rows (dataclasses) to write to bigquery + """ + serialized_rows = [self.transform(row) for row in rows] + try: + logger.info("Writing to BigQuery: %d rows", len(serialized_rows)) + insert_errors = self.client.insert_rows(table=self.table, rows=serialized_rows) + if insert_errors: + logger.error( + "There were errors while writing to Bigquery:\n%s", + pprint.pformat(insert_errors), + ) + else: + logger.info("Successfully wrote to BigQuery") + except Exception as e: + logger.exception("Failed to write to BigQuery with error %s", e) diff --git a/benchmarks/benchmark_db_writer/dataclass_converter_utils.py b/benchmarks/benchmark_db_writer/dataclass_converter_utils.py new file mode 100644 index 0000000000..1a4dda6cce --- /dev/null +++ b/benchmarks/benchmark_db_writer/dataclass_converter_utils.py @@ -0,0 +1,132 @@ +"""Convert a python dataclass into a BigQuery schema definition.""" + +import dataclasses +import logging +from typing import Any, List, Optional, Type, Union, get_type_hints + +from benchmarks.benchmark_db_writer import bigquery_types +from google.cloud.bigquery import SchemaField +import typing_extensions + +logger = logging.getLogger(__name__) + +_BASIC_TYPES_TO_NAME = {primitive_type: bq_type for bq_type, primitive_type in bigquery_types.TypeMapping.items()} +_NoneType = type(None) + + +def parse_inner_type_of_list(list_type: Any) -> Type: + return typing_extensions.get_args(list_type)[0] + + +def parse_inner_type_of_optional(optional_type: Any) -> Type: + args = typing_extensions.get_args(optional_type) + if not (len(args) == 2 and any(arg is _NoneType for arg in args)): + raise TypeError(f"Unsupported type: {optional_type}.") + + return next(arg for arg in args if arg is not _NoneType) + + +def _parse_field_description(field: dataclasses.Field) -> Optional[str]: + if "description" in field.metadata: + return field.metadata["description"] + return None + + +def _parse_fields(field_type: Type) -> List[SchemaField]: + """Recursive call for nested dataclasses.""" + + if dataclasses.is_dataclass(field_type): + return dataclass_to_schema(field_type) + return [] + + +def _parse_list(field: dataclasses.Field) -> SchemaField: + field_type = parse_inner_type_of_list(field.type) + return SchemaField( + name=field.name, + field_type=_python_type_to_big_query_type(field_type), + mode=bigquery_types.BigQueryFieldModes.REPEATED, + description=_parse_field_description(field), + fields=_parse_fields(field_type), + ) + + +def _python_type_to_big_query_type( + field_type: Any, +) -> bigquery_types.BigQueryTypes: + if dataclasses.is_dataclass(field_type): + return bigquery_types.BigQueryTypes.STRUCT + + bq_type = _BASIC_TYPES_TO_NAME.get(field_type) + if bq_type: + return bq_type + + raise TypeError(f"Unsupported type: {field_type}") + + +def _parse_optional(field: dataclasses.Field) -> SchemaField: + field_type = parse_inner_type_of_optional(field.type) + return SchemaField( + name=field.name, + field_type=_python_type_to_big_query_type(field_type), + mode=bigquery_types.BigQueryFieldModes.NULLABLE, + description=_parse_field_description(field), + fields=_parse_fields(field_type), + ) + + +def _field_to_schema(field: dataclasses.Field) -> SchemaField: + field_type = _BASIC_TYPES_TO_NAME.get(field.type) + if field_type: + return SchemaField( + name=field.name, + field_type=field_type, + description=_parse_field_description(field), + mode=bigquery_types.BigQueryFieldModes.REQUIRED, + ) + + if dataclasses.is_dataclass(field.type): + return SchemaField( + name=field.name, + field_type=bigquery_types.BigQueryTypes.STRUCT, + mode=bigquery_types.BigQueryFieldModes.REQUIRED, + description=_parse_field_description(field), + fields=_parse_fields(field.type), + ) + + # typing.Optional is the same as typing.Union[SomeType, NoneType] + if typing_extensions.get_origin(field.type) is Union: + return _parse_optional(field) + + if typing_extensions.get_origin(field.type) is list: + return _parse_list(field) + + raise TypeError(f"Unsupported type: {field.type}.") + + +def dataclass_to_schema(dataclass: Type, localns: Optional[dict] = None) -> List[SchemaField]: + """Transfrom a dataclass into a list of SchemaField. + + If you want to transform a dataclass that is not defined in the + global scope you need to pass your locals. + + def my_func(): + @dataclass + class Example1: + a: int + + @dataclass + class Example2: + b: Example1 + + dataclass_to_schema(Example2, localns=locals()) + """ + if not dataclasses.is_dataclass(dataclass): + raise TypeError("Not a dataclass.") + + type_hints = get_type_hints(dataclass, localns=localns) + dataclass_fields = dataclasses.fields(dataclass) + + for field in dataclass_fields: + field.type = type_hints[field.name] + return [_field_to_schema(field) for field in dataclass_fields] diff --git a/benchmarks/benchmark_db_writer/row_transformer_utils.py b/benchmarks/benchmark_db_writer/row_transformer_utils.py new file mode 100644 index 0000000000..3b5a08fc4b --- /dev/null +++ b/benchmarks/benchmark_db_writer/row_transformer_utils.py @@ -0,0 +1,26 @@ +import dataclasses +from typing import Generic, Type, TypeVar + +import dacite +from google.cloud.bigquery.table import Row + +T = TypeVar("T") # pylint: disable=invalid-name + + +class RowTransformer(Generic[T]): + """Serialized / deserialize rows.""" + + def __init__(self, schema: Type[T]): + self._schema: Type[T] = schema + + def bq_row_to_dataclass_instance(self, bq_row: Row) -> T: + """Create a dataclass instance from a row returned by the bq library.""" + + row_dict = dict(bq_row.items()) + + return dacite.from_dict(self._schema, row_dict, config=dacite.Config(check_types=False)) + + @staticmethod + def dataclass_instance_to_bq_row(instance: T) -> dict: + """Convert a dataclass instance into a dictionary, which can be inserted into bq.""" + return dataclasses.asdict(instance) diff --git a/benchmarks/benchmark_db_writer/run_summary_writer/__init__.py b/benchmarks/benchmark_db_writer/run_summary_writer/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/benchmarks/benchmark_db_writer/run_summary_writer/microbenchmark_run_summary_writer.py b/benchmarks/benchmark_db_writer/run_summary_writer/microbenchmark_run_summary_writer.py new file mode 100644 index 0000000000..c8f5d1f6a4 --- /dev/null +++ b/benchmarks/benchmark_db_writer/run_summary_writer/microbenchmark_run_summary_writer.py @@ -0,0 +1,171 @@ +"""Sample script for uploading manually to run summary table.""" + +import logging +import os +from typing import Optional +import uuid + +from benchmarks.benchmark_db_writer import bigquery_types +from benchmarks.benchmark_db_writer import bq_writer_utils +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + hardware_info_schema, +) +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + microbenchmark_run_summary_schema, +) +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + microbenchmark_workload_info_schema, +) +import gspread +import pandas as pd + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def validate_workload_id(workload_id: str, is_test: bool = False) -> bool: + """Validates a workload ID against the microbenchmark_workload_info table.""" + + return bq_writer_utils.validate_id( + logger, + workload_id, + "microbenchmark_workload_info", + "workload_id", + microbenchmark_workload_info_schema.MicrobenchmarkWorkloadInfo, + is_test, + ) + + +def validate_hardware_id(hardware_id: str, is_test: bool = False) -> bool: + """Validates a hardware ID against the hardware_info table.""" + return bq_writer_utils.validate_id( + logger, + hardware_id, + "hardware_info", + "hardware_id", + hardware_info_schema.HardwareInfo, + is_test, + ) + + +def write_run_manually( + run_id: str, + workload_id: str, + workload_parameters: str, + run_date: bigquery_types.TimeStamp, + hardware_id: str, + hardware_num_chips: int, + hardware_num_nodes: Optional[int], + hardware_num_slices: Optional[str], + result_success: bool, + configs_num_iterations: int, + configs_other: Optional[str], + logs_artifact_directory: Optional[str], + benchmarker_ldap: str, + run_source: str, + run_type: str, + metrics_type: str, + metrics_unit: str, + metrics_p50: float, + metrics_p90: float, + metrics_p99: float, + metrics_avg: Optional[float], + metrics_stdev: Optional[float], + metrics_other: Optional[str], + result_error: Optional[str], + logs_profile: Optional[str], + logs_cloud_logs: Optional[str], + logs_comments: Optional[str], + logs_other: Optional[str], + update_person_ldap: str = os.getenv("USER"), + is_test: bool = True, +) -> None: + """Writes a microbenchmark run manually to the database. + + This function validates the provided workload ID and, if valid, constructs a + MicrobenchmarkRunSummarySchema object with the given data and writes it to the + "microbenchmark_run_summary" table in BigQuery. + + Args: + is_test: Whether to use the testing project or the production project. + + Raises: + ValueError: If any of the IDs are invalid. + """ + + if validate_workload_id(workload_id, is_test) and validate_hardware_id(hardware_id, is_test): + + summary = microbenchmark_run_summary_schema.MicrobenchmarkRunSummarySchema( + run_id=run_id if run_id else f"run-{uuid.uuid4()}", + workload_id=workload_id, + workload_parameters=workload_parameters, + run_date=run_date, + hardware_id=hardware_id, + hardware_num_chips=hardware_num_chips, + hardware_num_nodes=hardware_num_nodes, + hardware_num_slices=hardware_num_slices, + result_success=result_success, + configs_num_iterations=configs_num_iterations, + configs_other=configs_other, + logs_artifact_directory=logs_artifact_directory, + benchmarker_ldap=benchmarker_ldap, + run_source=run_source, + run_type=run_type, + metrics_type=metrics_type, + metrics_unit=metrics_unit, + metrics_p50=metrics_p50, + metrics_p90=metrics_p90, + metrics_p99=metrics_p99, + metrics_avg=metrics_avg, + metrics_stdev=metrics_stdev, + metrics_other=metrics_other, + result_error=result_error, + logs_profile=logs_profile, + logs_cloud_logs=logs_cloud_logs, + logs_comments=logs_comments, + logs_other=logs_other, + update_person_ldap=update_person_ldap, + ) + + client = bq_writer_utils.get_db_client( + "microbenchmark_run_summary", + microbenchmark_run_summary_schema.MicrobenchmarkRunSummarySchema, + is_test, + ) + client.write([summary]) + + else: + raise ValueError("Could not upload data in run summary table") + + +def write_row(row, is_test): + kwargs = {k: None if v == "" else v for k, v in row.to_dict().items()} + kwargs["is_test"] = is_test + write_run_manually(**kwargs) + + +def upload_from_spreadsheet(spreadsheet_name, is_test): + # Uses supercomputer-testing service account + # benchmarkdb-gsheets-sa@supercomputer-testing.iam.gserviceaccount.com + # to authenticate. The GSheet must be shared with that service account. + gc = gspread.service_account() + sheet = gc.open(spreadsheet_name) + df = pd.DataFrame(sheet.sheet1.get_all_records()) + print(df.dtypes) + rows_to_upload = df[~(df["Uploaded to BQ"].str.lower() == "true")] + rows_to_upload = rows_to_upload.drop("Uploaded to BQ", axis=1) + print(rows_to_upload) + rows_to_upload.apply(lambda row: write_row(row, is_test), axis=1) + + +if __name__ == "__main__": + # spreadsheet = "Microbenchmark results Shared with service account for upload" + spreadsheet = None + upload_from_spreadsheet( + spreadsheet, + is_test=False, + ) diff --git a/benchmarks/benchmark_db_writer/run_summary_writer/run_summary_writer.py b/benchmarks/benchmark_db_writer/run_summary_writer/run_summary_writer.py new file mode 100644 index 0000000000..a3787c9611 --- /dev/null +++ b/benchmarks/benchmark_db_writer/run_summary_writer/run_summary_writer.py @@ -0,0 +1,156 @@ +"""Sample script for uploading manually to run summary table.""" + +import logging +import os +import uuid + +from benchmarks.benchmark_db_writer import bigquery_types +from benchmarks.benchmark_db_writer import bq_writer_utils +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + hardware_info_schema, +) +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import model_info_schema +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + software_info_schema, +) +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import storage_info_schema +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + workload_benchmark_v2_schema, +) +import gspread +import pandas as pd + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def validate_model_id(model_id: str, is_test: bool = False) -> bool: + """Validates a model ID against the model_info table.""" + + return bq_writer_utils.validate_id( + logger, + model_id, + "model_info", + "model_id", + model_info_schema.ModelInfo, + is_test, + ) + + +def validate_hardware_id(hardware_id: str, is_test: bool = False) -> bool: + """Validates a hardware ID against the hardware_info table.""" + return bq_writer_utils.validate_id( + logger, + hardware_id, + "hardware_info", + "hardware_id", + hardware_info_schema.HardwareInfo, + is_test, + ) + + +def validate_software_id(software_id: str, is_test: bool = False) -> bool: + """Validates a software ID against the software_info table.""" + return bq_writer_utils.validate_id( + logger, + software_id, + "software_info", + "software_id", + software_info_schema.SoftwareInfo, + is_test, + ) + + +def validate_storage_id(storage_id: str, is_test: bool = False) -> bool: + """Validates a storage ID against the storage_info table.""" + if storage_id: + return bq_writer_utils.validate_id( + logger, + storage_id, + "storage_info", + "storage_id", + storage_info_schema.StorageInfo, + is_test, + ) + return True + + +def write_run_manually( + is_test: bool = False, + **kwargs, +) -> None: + """Writes a workload benchmark run manually to the database. + + This function validates the provided IDs and, if valid, constructs a + WorkloadBenchmarkV2Schema object with the given data and writes it to the + "run_summary" table in BigQuery. + + + Raises: + ValueError: If any of the IDs are invalid. + """ + + assert "model_id" in kwargs and "hardware_id" in kwargs and "software_id" in kwargs + + model_id = kwargs["model_id"] + hardware_id = kwargs["hardware_id"] + software_id = kwargs["software_id"] + storage_id = kwargs["storage_id"] if "storage_id" in kwargs else None + if "run_id" not in kwargs: + kwargs["run_id"] = f"run-{uuid.uuid4()}" + if "experiment_id" not in kwargs: + kwargs["experiment_id"] = f"experiment-manual-{uuid.uuid4()}" + + kwargs["run_source"] = "manual" + + if ( + validate_model_id(model_id, is_test) + and validate_hardware_id(hardware_id, is_test) + and validate_software_id(software_id, is_test) + and validate_storage_id(storage_id, is_test) + ): + + summary = workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema(**kwargs) + + client = bq_writer_utils.get_db_client( + "run_summary", + workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema, + is_test, + ) + client.write([summary]) + + else: + raise ValueError("Could not upload data in run summary table") + + +def write_row(row, is_test): + kwargs = {k: None if v == "" else v for k, v in row.to_dict().items()} + kwargs["is_test"] = is_test + write_run_manually(**kwargs) + + +def upload_from_spreadsheet(spreadsheet_name, is_test): + # Uses supercomputer-testing service account + # benchmarkdb-gsheets-sa@supercomputer-testing.iam.gserviceaccount.com + # to authenticate. The GSheet must be shared with that service account. + gc = gspread.service_account() + sheet = gc.open(spreadsheet_name) + df = pd.DataFrame(sheet.sheet1.get_all_records()) + print(df.dtypes) + rows_to_upload = df[~(df["Uploaded to BQ"].str.lower() == "true")] + rows_to_upload = rows_to_upload.drop("Uploaded to BQ", axis=1) + print(rows_to_upload) + rows_to_upload.apply(lambda row: write_row(row, is_test), axis=1) + + +if __name__ == "__main__": + # spreadsheet = "BenchmarkDB Training Dataset" + spreadsheet = None + upload_from_spreadsheet( + spreadsheet, + is_test=False, + ) diff --git a/benchmarks/benchmark_db_writer/run_summary_writer/test_run_summary_writer.py b/benchmarks/benchmark_db_writer/run_summary_writer/test_run_summary_writer.py new file mode 100644 index 0000000000..f95d5a787f --- /dev/null +++ b/benchmarks/benchmark_db_writer/run_summary_writer/test_run_summary_writer.py @@ -0,0 +1,28 @@ +"""This script validates the schema of big query table & dataclass present in code. + +Modify arguments of check_schema() method in main function +to check prod & test env. +""" + +from benchmarks.benchmark_db_writer import bq_writer_utils +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + workload_benchmark_v2_schema, +) + + +def check_schema(is_test: bool = True) -> None: + + try: + bq_writer_utils.get_db_client( + "run_summary", + workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema, + is_test, + ) + print("No schema mismatch found") + except Exception as e: + print("Schema mismatch found.", e) + + +if __name__ == "__main__": + # Change is_test flat to True for test env's table + check_schema(is_test=True) diff --git a/benchmarks/benchmark_db_writer/schema/__init__.py b/benchmarks/benchmark_db_writer/schema/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/benchmarks/benchmark_db_writer/schema/supplemental_metrics/__init__.py b/benchmarks/benchmark_db_writer/schema/supplemental_metrics/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/benchmarks/benchmark_db_writer/schema/supplemental_metrics/checkpointing_metrics_schema.py b/benchmarks/benchmark_db_writer/schema/supplemental_metrics/checkpointing_metrics_schema.py new file mode 100644 index 0000000000..a657797dbf --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema/supplemental_metrics/checkpointing_metrics_schema.py @@ -0,0 +1,36 @@ +import dataclasses +import datetime +from typing import Optional +from benchmarks.benchmark_db_writer import bigquery_types + + +@dataclasses.dataclass +class CheckpointingMetricsInfo: + run_id: str + restore_time_p50: Optional[float] + restore_time_p90: Optional[float] + restore_time_p99: Optional[float] + restore_time_p100: Optional[float] + restore_time_max: Optional[float] + restore_time_min: Optional[float] + restore_time_avg: Optional[float] + restore_time_stddev: Optional[float] + restore_time_initial: Optional[float] + write_time_p50: Optional[float] + write_time_p90: Optional[float] + write_time_p99: Optional[float] + write_time_p100: Optional[float] + write_time_max: Optional[float] + write_time_min: Optional[float] + write_time_avg: Optional[float] + write_time_stddev: Optional[float] + accelerator_to_cpu_time_max: Optional[float] + accelerator_to_cpu_time_min: Optional[float] + accelerator_to_cpu_time_avg: Optional[float] + storage_save_time_max: Optional[float] + storage_save_time_min: Optional[float] + storage_save_time_avg: Optional[float] + num_restore_datapoints: Optional[int] + num_write_datapoints: Optional[int] + additional_metrics: Optional[dict] + update_timestamp: Optional[bigquery_types.TimeStamp] = datetime.datetime.now() diff --git a/benchmarks/benchmark_db_writer/schema/supplemental_metrics/data_loading_metrics_schema.py b/benchmarks/benchmark_db_writer/schema/supplemental_metrics/data_loading_metrics_schema.py new file mode 100644 index 0000000000..a95c624d8a --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema/supplemental_metrics/data_loading_metrics_schema.py @@ -0,0 +1,17 @@ +import dataclasses +import datetime +from typing import Optional +from benchmarks.benchmark_db_writer import bigquery_types + + +@dataclasses.dataclass +class DataLoadingMetricsInfo: + run_id: str + data_loading_tokens_per_sec_p50: Optional[float] + data_loading_tokens_per_sec_p90: Optional[float] + data_loading_tokens_per_sec_p99: Optional[float] + data_loading_tokens_per_sec_p100: Optional[float] + accelerator_blocked_time: Optional[float] + accelerator_blocked_percent: Optional[float] + additional_metrics: Optional[dict] + update_timestamp: Optional[bigquery_types.TimeStamp] = datetime.datetime.now() diff --git a/benchmarks/benchmark_db_writer/schema/supplemental_metrics/listing_metrics_schema.py b/benchmarks/benchmark_db_writer/schema/supplemental_metrics/listing_metrics_schema.py new file mode 100644 index 0000000000..2e81261a55 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema/supplemental_metrics/listing_metrics_schema.py @@ -0,0 +1,37 @@ +import dataclasses +from typing import Optional + + +@dataclasses.dataclass +class ListingMetricsInfo: + run_id: str + first_iteration_metrics_gcs_list_time_avg: Optional[float] + first_iteration_metrics_fuse_list_time_avg: Optional[float] + first_iteration_metrics_gcs_subdir_list_time_avg: Optional[float] + first_iteration_metrics_gcs_list_time_p50: Optional[float] + first_iteration_metrics_fuse_list_time_p50: Optional[float] + first_iteration_metrics_gcs_subdir_list_time_p50: Optional[float] + first_iteration_metrics_gcs_list_time_p90: Optional[float] + first_iteration_metrics_fuse_list_time_p90: Optional[float] + first_iteration_metrics_gcs_subdir_list_time_p90: Optional[float] + first_iteration_metrics_gcs_list_time_p99: Optional[float] + first_iteration_metrics_fuse_list_time_p99: Optional[float] + first_iteration_metrics_gcs_subdir_list_time_p99: Optional[float] + first_iteration_metrics_gcs_list_time_p100: Optional[float] + first_iteration_metrics_fuse_list_time_p100: Optional[float] + first_iteration_metrics_gcs_subdir_list_time_p100: Optional[float] + subsq_iteration_metrics_gcs_list_time_avg: Optional[float] + subsq_iteration_metrics_fuse_list_time_avg: Optional[float] + subsq_iteration_metrics_gcs_subdir_list_time_avg: Optional[float] + subsq_iteration_metrics_gcs_list_time_p50: Optional[float] + subsq_iteration_metrics_fuse_list_time_p50: Optional[float] + subsq_iteration_metrics_gcs_subdir_list_time_p50: Optional[float] + subsq_iteration_metrics_gcs_list_time_p90: Optional[float] + subsq_iteration_metrics_fuse_list_time_p90: Optional[float] + subsq_iteration_metrics_gcs_subdir_list_time_p90: Optional[float] + subsq_iteration_metrics_gcs_list_time_p99: Optional[float] + subsq_iteration_metrics_fuse_list_time_p99: Optional[float] + subsq_iteration_metrics_gcs_subdir_list_time_p99: Optional[float] + subsq_iteration_metrics_gcs_list_time_p100: Optional[float] + subsq_iteration_metrics_fuse_list_time_p100: Optional[float] + subsq_iteration_metrics_gcs_subdir_list_time_p100: Optional[float] diff --git a/benchmarks/benchmark_db_writer/schema/supplemental_metrics/network_metrics_schema.py b/benchmarks/benchmark_db_writer/schema/supplemental_metrics/network_metrics_schema.py new file mode 100644 index 0000000000..2d40af3759 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema/supplemental_metrics/network_metrics_schema.py @@ -0,0 +1,22 @@ +import dataclasses +import datetime +from typing import Optional +from benchmarks.benchmark_db_writer import bigquery_types + + +@dataclasses.dataclass +class NetworkMetricsInfo: + run_id: str + server_max_egress: Optional[float] = None + server_avg_egress: Optional[float] = None + server_max_ingress: Optional[float] = None + server_avg_ingress: Optional[float] = None + server_max_qps: Optional[float] = None + server_avg_qps: Optional[float] = None + client_max_egress: Optional[float] = None + client_avg_egress: Optional[float] = None + client_max_ingress: Optional[float] = None + client_avg_ingress: Optional[float] = None + client_max_qps: Optional[float] = None + client_avg_qps: Optional[float] = None + update_timestamp: Optional[bigquery_types.TimeStamp] = datetime.datetime.now() diff --git a/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/__init__.py b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/hardware_info_schema.py b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/hardware_info_schema.py new file mode 100644 index 0000000000..f4c34f3bfe --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/hardware_info_schema.py @@ -0,0 +1,22 @@ +import dataclasses +import datetime +from typing import Optional +from benchmarks.benchmark_db_writer import bigquery_types + + +@dataclasses.dataclass +class HardwareInfo: + hardware_id: str + gcp_accelerator_name: str + chip_name: str + bf_16_tflops: int + memory: float + hardware_type: str + provider_name: str + update_person_ldap: str + chips_per_node: Optional[int] = None + description: Optional[str] = "" + other: Optional[str] = "" + update_timestamp: Optional[bigquery_types.TimeStamp] = datetime.datetime.now() + host_vcpus: Optional[int] = None + host_memory: Optional[int] = None # host_memory in GB diff --git a/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/microbenchmark_run_summary_schema.py b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/microbenchmark_run_summary_schema.py new file mode 100644 index 0000000000..b76a96725b --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/microbenchmark_run_summary_schema.py @@ -0,0 +1,54 @@ +import dataclasses +import datetime +from typing import Optional +from benchmarks.benchmark_db_writer import bigquery_types + +# Table: microbenchmark_run_summary + + +@dataclasses.dataclass +class MicrobenchmarkRunSummarySchema: + run_id: str + + # Unique workload_id to map to microbenchmark_workload_info table + workload_id: str + workload_parameters: str + run_date: bigquery_types.TimeStamp + + # Foreign key to join with hardware info + hardware_id: str + hardware_num_chips: int + + result_success: bool + + configs_num_iterations: int + + benchmarker_ldap: str + + metrics_type: str + metrics_unit: str + metrics_p50: float + metrics_p90: float + metrics_p99: float + update_person_ldap: str + + hardware_num_nodes: Optional[int] = None + hardware_num_slices: Optional[str] = None + configs_other: Optional[str] = None + + logs_artifact_directory: Optional[str] = None + run_source: str = "manual" + run_type: str = "perf_optimization" + + metrics_avg: Optional[float] = None + metrics_stdev: Optional[float] = None + metrics_other: Optional[str] = None + + result_error: Optional[str] = None + + logs_profile: Optional[str] = None + logs_cloud_logs: Optional[str] = None + logs_comments: Optional[str] = None + logs_other: Optional[str] = None + + update_timestamp: Optional[bigquery_types.TimeStamp] = datetime.datetime.now() diff --git a/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/microbenchmark_workload_info_schema.py b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/microbenchmark_workload_info_schema.py new file mode 100644 index 0000000000..6421b1284d --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/microbenchmark_workload_info_schema.py @@ -0,0 +1,16 @@ +import dataclasses +import datetime +from typing import Optional +from benchmarks.benchmark_db_writer import bigquery_types + + +# Table microbenchmark_workload_info + + +@dataclasses.dataclass +class MicrobenchmarkWorkloadInfo: + workload_id: str + update_person_ldap: str + description: Optional[str] = "" + update_timestamp: Optional[bigquery_types.TimeStamp] = datetime.datetime.now() + display_name: Optional[str] = None diff --git a/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/model_info_schema.py b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/model_info_schema.py new file mode 100644 index 0000000000..fb340c602b --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/model_info_schema.py @@ -0,0 +1,16 @@ +import dataclasses +import datetime +from typing import Optional +from benchmarks.benchmark_db_writer import bigquery_types + + +@dataclasses.dataclass +class ModelInfo: + model_id: str + name: str + variant: str + parameter_size_in_billions: float + update_person_ldap: str + description: Optional[str] = "" + details: Optional[str] = "" + update_timestamp: Optional[bigquery_types.TimeStamp] = datetime.datetime.now() diff --git a/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/software_info_schema.py b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/software_info_schema.py new file mode 100644 index 0000000000..3ada6f2649 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/software_info_schema.py @@ -0,0 +1,16 @@ +import dataclasses +import datetime +from typing import Optional +from benchmarks.benchmark_db_writer import bigquery_types + + +@dataclasses.dataclass +class SoftwareInfo: + software_id: str + ml_framework: str + os: str + training_framework: str + update_person_ldap: str + compiler: Optional[str] = None + description: Optional[str] = "" + update_timestamp: Optional[bigquery_types.TimeStamp] = datetime.datetime.now() diff --git a/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/storage_info_schema.py b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/storage_info_schema.py new file mode 100644 index 0000000000..cdbf759098 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/storage_info_schema.py @@ -0,0 +1,13 @@ +import dataclasses +from typing import Optional +from benchmarks.benchmark_db_writer import bigquery_types + + +@dataclasses.dataclass +class StorageInfo: + storage_id: str + storage_product: Optional[str] + description: Optional[str] + config: Optional[dict] + update_person_ldap: Optional[str] + update_timestamp: Optional[bigquery_types.TimeStamp] diff --git a/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/workload_benchmark_v2_schema.py b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/workload_benchmark_v2_schema.py new file mode 100644 index 0000000000..8edb6381c5 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema/workload_benchmark_v2/workload_benchmark_v2_schema.py @@ -0,0 +1,101 @@ +import dataclasses +import datetime +from typing import Optional +from benchmarks.benchmark_db_writer import bigquery_types + + +@dataclasses.dataclass +class WorkloadBenchmarkV2Schema: + run_id: str + + # Unique model id to map model info table + model_id: str + + # Foreign key to join with software info + software_id: str + # Foreign key to join with hardware info + hardware_id: str + hardware_num_chips: int + + result_success: bool + + update_person_ldap: str + configs_framework: Optional[str] = None + configs_container_version: Optional[str] = None + logs_artifact_directory: Optional[str] = None + configs_env: Optional[str] = None + hardware_num_nodes: Optional[int] = None + + # Foreign key to join with storage info + storage_id: Optional[str] = None + + run_source: str = "manual" + is_run_externally_visible: bool = False + run_type: str = "perf_optimization" + + run_release_status: Optional[str] = "local" + k8_jobset_yaml_file_path: Optional[str] = None + + benchmark_type: Optional[str] = None + + experiment_id: Optional[str] = None + + workload_gbs: Optional[int] = None + workload_mbs: Optional[int] = None + workload_precision: Optional[str] = None + workload_optimizer: Optional[str] = None + workload_others: Optional[str] = None + workload_manager: Optional[str] = None + workload_type: str = "training" + workload_sequence_length: Optional[int] = None + + metrics_step_time: Optional[float] = None + metrics_mfu: Optional[float] = None + metrics_tokens_per_second: Optional[float] = None + metrics_e2e_time: Optional[float] = None + metrics_num_steps: Optional[int] = None + metrics_other: Optional[str] = None + metrics_tflops_per_second: Optional[float] = None + + hardware_num_superblocks: Optional[str] = None + hardware_num_slices: Optional[int] = None + hardware_topology: Optional[str] = None + hardware_num_cores: Optional[int] = None + result_error: Optional[str] = None + hardware_nccl_driver_nickname: Optional[str] = None + + configs_xla_flags: Optional[str] = None + configs_dataset: Optional[str] = None + configs_reviewer: Optional[str] = None + configs_other: Optional[str] = None + + logs_profile: Optional[str] = None + logs_cloud_logs: Optional[str] = None + logs_comments: Optional[str] = None + logs_other: Optional[str] = None + + checkpointing_async: Optional[bool] = None + checkpointing_interval_every_n_steps: Optional[int] = None + checkpointing_size_in_gibs: Optional[float] = None + checkpointing_individual_file_size: Optional[int] = None + checkpointing_file_format: Optional[str] = None + + max_epochs: Optional[int] = None + max_steps: Optional[int] = None + training_dataset_samples: Optional[int] = None + data_loader_num_workers: Optional[int] = None + data_loader_prefetch_factor: Optional[int] = None + training_dataset_file_format: Optional[str] = None + + start_time: Optional[bigquery_types.TimeStamp] = None + end_time: Optional[bigquery_types.TimeStamp] = None + + gcs_metrics_bucket: Optional[str] = None + gcsfuse_csi_driver: Optional[str] = None + cloud_region: Optional[str] = None + source_bucket: Optional[str] = None + + cluster_name: Optional[str] = None + + reviewer_ldap: str = "" + update_timestamp: Optional[bigquery_types.TimeStamp] = datetime.datetime.now() diff --git a/benchmarks/benchmark_db_writer/schema_generator/external_visibility_tables/externally_visible_runs.json b/benchmarks/benchmark_db_writer/schema_generator/external_visibility_tables/externally_visible_runs.json new file mode 100644 index 0000000000..93f99832bc --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/external_visibility_tables/externally_visible_runs.json @@ -0,0 +1,30 @@ +[ + { + "name": "update_timestamp", + "mode": "NULLABLE", + "type": "TIMESTAMP", + "description": "", + "fields": [] + }, + { + "name": "run_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "reviewer_ldap", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_person_ldap", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + } +] diff --git a/benchmarks/benchmark_db_writer/schema_generator/info_tables/hardware_info.json b/benchmarks/benchmark_db_writer/schema_generator/info_tables/hardware_info.json new file mode 100644 index 0000000000..cb7bbae3cf --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/info_tables/hardware_info.json @@ -0,0 +1,100 @@ +[ + { + "name": "hardware_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "gcp_accelerator_name", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "chip_name", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "bf_16_tflops", + "mode": "REQUIRED", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "memory", + "mode": "REQUIRED", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "hardware_type", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "provider_name", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_person_ldap", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "chips_per_node", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "description", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "other", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_timestamp", + "mode": "", + "type": "TIMESTAMP", + "description": "", + "fields": [] + }, + { + "name": "host_memory", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "host_vcpus", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + } +] \ No newline at end of file diff --git a/benchmarks/benchmark_db_writer/schema_generator/info_tables/microbenchmark_workload_info.json b/benchmarks/benchmark_db_writer/schema_generator/info_tables/microbenchmark_workload_info.json new file mode 100644 index 0000000000..f7b9111b06 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/info_tables/microbenchmark_workload_info.json @@ -0,0 +1,39 @@ +[ + { + "name": "workload_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "description", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_person_ldap", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [], + "defaultValueExpression": "SESSION_USER()" + }, + { + "name": "update_timestamp", + "mode": "NULLABLE", + "type": "TIMESTAMP", + "description": "", + "fields": [], + "defaultValueExpression": "CURRENT_TIMESTAMP()" + }, + { + "name": "display_name", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + } +] diff --git a/benchmarks/benchmark_db_writer/schema_generator/info_tables/model_info.json b/benchmarks/benchmark_db_writer/schema_generator/info_tables/model_info.json new file mode 100644 index 0000000000..bd229cc690 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/info_tables/model_info.json @@ -0,0 +1,58 @@ +[ + { + "name": "model_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "name", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "variant", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "parameter_size_in_billions", + "mode": "REQUIRED", + "type": "NUMERIC", + "description": "", + "fields": [] + }, + { + "name": "update_person_ldap", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "description", + "mode": "", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "details", + "mode": "", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_timestamp", + "mode": "", + "type": "TIMESTAMP", + "description": "", + "fields": [] + } +] diff --git a/benchmarks/benchmark_db_writer/schema_generator/info_tables/software_info.json b/benchmarks/benchmark_db_writer/schema_generator/info_tables/software_info.json new file mode 100644 index 0000000000..1bc69bbc38 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/info_tables/software_info.json @@ -0,0 +1,58 @@ +[ + { + "name": "software_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "ml_framework", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "os", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "training_framework", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_person_ldap", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "compiler", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "description", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_timestamp", + "mode": "", + "type": "TIMESTAMP", + "description": "", + "fields": [] + } +] diff --git a/benchmarks/benchmark_db_writer/schema_generator/info_tables/storage_info.json b/benchmarks/benchmark_db_writer/schema_generator/info_tables/storage_info.json new file mode 100644 index 0000000000..6ebb4ccea0 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/info_tables/storage_info.json @@ -0,0 +1,44 @@ +[ + { + "name": "storage_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "storage_product", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "config", + "mode": "NULLABLE", + "type": "JSON", + "description": "", + "fields": [] + }, + { + "name": "description", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_person_ldap", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_timestamp", + "mode": "NULLABLE", + "type": "TIMESTAMP", + "description": "", + "fields": [] + } +] diff --git a/benchmarks/benchmark_db_writer/schema_generator/run_summary_tables/microbenchmark_run_summary.json b/benchmarks/benchmark_db_writer/schema_generator/run_summary_tables/microbenchmark_run_summary.json new file mode 100644 index 0000000000..755e9cc3bb --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/run_summary_tables/microbenchmark_run_summary.json @@ -0,0 +1,212 @@ +[ + { + "name": "run_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "workload_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "workload_parameters", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "run_date", + "mode": "NULLABLE", + "type": "TIMESTAMP", + "description": "", + "fields": [] + }, + { + "name": "hardware_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "hardware_num_chips", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "hardware_num_nodes", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "hardware_num_slices", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "result_success", + "mode": "NULLABLE", + "type": "BOOLEAN", + "description": "", + "fields": [] + }, + { + "name": "configs_num_iterations", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "configs_other", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "logs_artifact_directory", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "benchmarker_ldap", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "run_source", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "run_type", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "metrics_type", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "metrics_unit", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "metrics_p50", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "metrics_p90", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "metrics_p99", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "metrics_avg", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "metrics_stdev", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "metrics_other", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "result_error", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "logs_profile", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "logs_cloud_logs", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "logs_comments", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "logs_other", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_person_ldap", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_timestamp", + "mode": "NULLABLE", + "type": "TIMESTAMP", + "description": "", + "fields": [] + } +] diff --git a/benchmarks/benchmark_db_writer/schema_generator/run_summary_tables/run_summary.json b/benchmarks/benchmark_db_writer/schema_generator/run_summary_tables/run_summary.json new file mode 100644 index 0000000000..5c93315621 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/run_summary_tables/run_summary.json @@ -0,0 +1,485 @@ +[ + { + "name": "run_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "model_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "workload_gbs", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "workload_precision", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "workload_optimizer", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "workload_others", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "workload_manager", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "software_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "hardware_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "hardware_num_chips", + "mode": "REQUIRED", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "hardware_num_nodes", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "result_success", + "mode": "REQUIRED", + "type": "BOOLEAN", + "description": "", + "fields": [] + }, + { + "name": "metrics_step_time", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "metrics_mfu", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "metrics_tokens_per_second", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "metrics_e2e_time", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "metrics_num_steps", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "metrics_other", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "configs_framework", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "configs_env", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "configs_container_version", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "logs_artifact_directory", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_person_ldap", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "run_source", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "is_run_externally_visible", + "mode": "REQUIRED", + "type": "BOOLEAN", + "description": "", + "fields": [] + }, + { + "name": "run_type", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "workload_type", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "workload_sequence_length", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "hardware_num_superblocks", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "hardware_num_slices", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "hardware_topology", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "hardware_num_cores", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "result_error", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "hardware_nccl_driver_nickname", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "configs_xla_flags", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "configs_dataset", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "configs_reviewer", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "configs_other", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "logs_profile", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "logs_cloud_logs", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "logs_comments", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "logs_other", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "reviewer_ldap", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "run_release_status", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "experiment_id", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_timestamp", + "mode": "", + "type": "TIMESTAMP", + "description": "", + "fields": [] + }, + { + "name": "storage_id", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "benchmark_type", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "gcs_metrics_bucket", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "cloud_region", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "source_bucket", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "cluster_name", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "checkpointing_async", + "mode": "NULLABLE", + "type": "BOOLEAN", + "description": "", + "fields": [] + }, + { + "name": "checkpointing_interval_every_n_steps", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "checkpointing_size_in_gibs", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "checkpointing_individual_file_size", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "checkpointing_file_format", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "start_time", + "mode": "NULLABLE", + "type": "TIMESTAMP", + "description": "", + "fields": [] + }, + { + "name": "end_time", + "mode": "NULLABLE", + "type": "TIMESTAMP", + "description": "", + "fields": [] + }, + { + "name": "k8_jobset_yaml_file_path", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "workload_mbs", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "max_epochs", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "max_steps", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "training_dataset_samples", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "data_loader_num_workers", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "data_loader_prefetch_factor", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "training_dataset_file_format", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "gcsfuse_csi_driver", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "metrics_tflops_per_second", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + } +] diff --git a/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/checkpointing_metrics.json b/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/checkpointing_metrics.json new file mode 100644 index 0000000000..f75dbf11bb --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/checkpointing_metrics.json @@ -0,0 +1,198 @@ +[ + { + "name": "run_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "restore_time_p50", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "restore_time_p90", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "restore_time_p99", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "restore_time_p100", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "restore_time_max", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "restore_time_min", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "restore_time_avg", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "restore_time_stddev", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "restore_time_initial", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "write_time_p50", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "write_time_p90", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "write_time_p99", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "write_time_p100", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "write_time_max", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "write_time_min", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "write_time_avg", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "write_time_stddev", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "accelerator_to_cpu_time_max", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "accelerator_to_cpu_time_min", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "accelerator_to_cpu_time_avg", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "storage_save_time_max", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "storage_save_time_min", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "storage_save_time_avg", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "num_restore_datapoints", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "num_write_datapoints", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "update_timestamp", + "mode": "NULLABLE", + "type": "TIMESTAMP", + "description": "", + "fields": [] + }, + { + "name": "additional_metrics", + "mode": "NULLABLE", + "type": "JSON", + "description": "", + "fields": [] + } +] diff --git a/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/data_loading_metrics.json b/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/data_loading_metrics.json new file mode 100644 index 0000000000..7b35d91f81 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/data_loading_metrics.json @@ -0,0 +1,48 @@ +[ + { + "mode": "REQUIRED", + "name": "run_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "data_loading_tokens_per_sec_p50", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "data_loading_tokens_per_sec_p90", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "data_loading_tokens_per_sec_p99", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "data_loading_tokens_per_sec_p100", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "accelerator_blocked_time", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "accelerator_blocked_percent", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "update_timestamp", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "additional_metrics", + "type": "JSON" + } +] + diff --git a/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/listing_metrics.json b/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/listing_metrics.json new file mode 100644 index 0000000000..44e7afccdc --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/listing_metrics.json @@ -0,0 +1,219 @@ +[ + { + "name": "run_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_gcs_list_time_avg", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_fuse_list_time_avg", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_gcs_subdir_list_time_avg", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_gcs_list_time_p50", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_fuse_list_time_p50", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_gcs_subdir_list_time_p50", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_gcs_list_time_p90", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_fuse_list_time_p90", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_gcs_subdir_list_time_p90", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_gcs_list_time_p99", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_fuse_list_time_p99", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_gcs_subdir_list_time_p99", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_gcs_list_time_p100", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_fuse_list_time_p100", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "first_iteration_metrics_gcs_subdir_list_time_p100", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_gcs_list_time_avg", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_fuse_list_time_avg", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_gcs_subdir_list_time_avg", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_gcs_list_time_p50", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_fuse_list_time_p50", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_gcs_subdir_list_time_p50", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_gcs_list_time_p90", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_fuse_list_time_p90", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_gcs_subdir_list_time_p90", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_gcs_list_time_p99", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_fuse_list_time_p99", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_gcs_subdir_list_time_p99", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_gcs_list_time_p100", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_fuse_list_time_p100", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "subsq_iteration_metrics_gcs_subdir_list_time_p100", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + } +] diff --git a/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/network_metrics.json b/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/network_metrics.json new file mode 100644 index 0000000000..3c2e17b25e --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/network_metrics.json @@ -0,0 +1,93 @@ +[ + { + "name": "run_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "server_max_egress", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "server_avg_egress", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "server_max_ingress", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "server_avg_ingress", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "server_max_qps", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "server_avg_qps", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "client_max_egress", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "client_avg_egress", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "client_max_ingress", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "client_avg_ingress", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "client_max_qps", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "client_avg_qps", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + } +] diff --git a/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/storage_info.json b/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/storage_info.json new file mode 100644 index 0000000000..03ff025f77 --- /dev/null +++ b/benchmarks/benchmark_db_writer/schema_generator/supplemental_metrics/storage_info.json @@ -0,0 +1,47 @@ +[ + { + "name": "storage_id", + "mode": "REQUIRED", + "type": "STRING", + "description": "", + "fields": [], + "defaultValueExpression": "GENERATE_UUID()" + }, + { + "name": "storage_product", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "config", + "mode": "NULLABLE", + "type": "JSON", + "description": "", + "fields": [] + }, + { + "name": "description", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "update_person_ldap", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [], + "defaultValueExpression": "SESSION_USER()" + }, + { + "name": "update_timestamp", + "mode": "NULLABLE", + "type": "TIMESTAMP", + "description": "", + "fields": [], + "defaultValueExpression": "CURRENT_TIMESTAMP()" + } +] diff --git a/benchmarks/benchmark_db_writer/supplemental_metrics_writer/__init__.py b/benchmarks/benchmark_db_writer/supplemental_metrics_writer/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/benchmarks/benchmark_db_writer/supplemental_metrics_writer/checkpointing_metrics_writer.py b/benchmarks/benchmark_db_writer/supplemental_metrics_writer/checkpointing_metrics_writer.py new file mode 100644 index 0000000000..8b3b976aa8 --- /dev/null +++ b/benchmarks/benchmark_db_writer/supplemental_metrics_writer/checkpointing_metrics_writer.py @@ -0,0 +1,280 @@ +"""Update the checkpointing_metrics table of the benchmark dataset.""" + +import json +import logging +from typing import Sequence + +from absl import app +from absl import flags +from benchmark_db_writer import bq_writer_utils +from benchmark_db_writer.schema.supplemental_metrics import ( + checkpointing_metrics_schema, +) +from benchmark_db_writer.schema.workload_benchmark_v2 import ( + workload_benchmark_v2_schema, +) +from benchmark_db_writer.supplemental_metrics_writer import common_flags + +_RESTORE_TIME_P50 = flags.DEFINE_float( + "restore_time_p50", + None, + "50 percentile of the time it takes to restore a checkpoint from storage," " in seconds.", +) +_RESTORE_TIME_P90 = flags.DEFINE_float( + "restore_time_p90", + None, + "90 percentile of the time it takes to restore a checkpoint from storage," " in seconds.", +) +_RESTORE_TIME_P99 = flags.DEFINE_float( + "restore_time_p99", + None, + "99 percentile of the time it takes to restore a checkpoint from storage," " in seconds.", +) +_RESTORE_TIME_P100 = flags.DEFINE_float( + "restore_time_p100", + None, + "100 percentile of the time it takes to restore a checkpoint from storage," " in seconds.", +) +_RESTORE_TIME_MAX = flags.DEFINE_float( + "restore_time_max", + None, + "The max time it takes to restore a checkpoint from storage, in seconds.", +) +_RESTORE_TIME_MIN = flags.DEFINE_float( + "restore_time_min", + None, + "The min time it takes to restore a checkpoint from storage, in seconds.", +) +_RESTORE_TIME_AVG = flags.DEFINE_float( + "restore_time_avg", + None, + "The average time it takes to restore a checkpoint from storage," " in seconds.", +) +_RESTORE_TIME_STDDEV = flags.DEFINE_float( + "restore_time_stddev", + None, + "The average time it takes to restore a checkpoint from storage," " in seconds.", +) +_RESTORE_TIME_INITIAL = flags.DEFINE_float( + "restore_time_initial", + None, + "The time it takes to restore a checkpoint from storage for the first time" + "(the initial restore may be cached), in seconds.", +) +_WRITE_TIME_P50 = flags.DEFINE_float( + "write_time_p50", + None, + "50 percentile of the time it takes to write a checkpoint to storage, in" " seconds.", +) +_WRITE_TIME_P90 = flags.DEFINE_float( + "write_time_p90", + None, + "90 percentile of the time it takes to write a checkpoint to storage, in" " seconds.", +) +_WRITE_TIME_P99 = flags.DEFINE_float( + "write_time_p99", + None, + "99 percentile of the time it takes to write a checkpoint to storage, in" " seconds.", +) +_WRITE_TIME_P100 = flags.DEFINE_float( + "write_time_p100", + None, + "100 percentile of the time it takes to write a checkpoint to storage, in" " seconds.", +) +_WRITE_TIME_MAX = flags.DEFINE_float( + "write_time_max", + None, + "The max time it takes to write a checkpoint, in seconds. We measure" + " the time elapsed from when a checkpoint begins blocking training until" + " all checkpoint data is successfully saved to storage.", +) +_WRITE_TIME_MIN = flags.DEFINE_float( + "write_time_min", + None, + "The min time it takes to write a checkpoint, in seconds. We measure" + " the time elapsed from when a checkpoint begins blocking training until" + " all checkpoint data is successfully saved to storage.", +) +_WRITE_TIME_AVG = flags.DEFINE_float( + "write_time_avg", + None, + "The average time it takes to write a checkpoint, in seconds. We measure" + " the time elapsed from when a checkpoint begins blocking training until" + " all checkpoint data is successfully saved to storage.", +) +_WRITE_TIME_STDDEV = flags.DEFINE_float( + "write_time_stddev", + None, + "The average time it takes to write a checkpoint, in seconds. We measure" + " the time elapsed from when a checkpoint begins blocking training until" + " all checkpoint data is successfully saved to storage.", +) +_ACCELERATOR_TO_CPU_TIME_MAX = flags.DEFINE_float( + "accelerator_to_cpu_time_max", + None, + "The max time it takes to transfer a checkpoint from GPU to CPU memory," " in seconds.", +) +_ACCELERATOR_TO_CPU_TIME_MIN = flags.DEFINE_float( + "accelerator_to_cpu_time_min", + None, + "The min time it takes to transfer a checkpoint from GPU to CPU memory," " in seconds.", +) +_ACCELERATOR_TO_CPU_TIME_AVG = flags.DEFINE_float( + "accelerator_to_cpu_time_avg", + None, + "The average time it takes to transfer a checkpoint from GPU to CPU memory," " in seconds.", +) +_STORAGE_SAVE_TIME_MAX = flags.DEFINE_float( + "storage_save_time_max", + None, + "The max time it takes to write a checkpoint from CPU memory to storage," " in seconds.", +) +_STORAGE_SAVE_TIME_MIN = flags.DEFINE_float( + "storage_save_time_min", + None, + "The min time it takes to write a checkpoint from CPU memory to storage," " in seconds.", +) +_STORAGE_SAVE_TIME_AVG = flags.DEFINE_float( + "storage_save_time_avg", + None, + "The average time it takes to write a checkpoint from CPU memory to" " storage, in seconds.", +) +_NUM_RESTORE_DATAPOINTS = flags.DEFINE_integer("num_restore_datapoints", None, "The number of the restore datapoints.") +_NUM_WRITE_DATAPOINTS = flags.DEFINE_integer("num_write_datapoints", None, "The number of the write datapoints.") + + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def write_checkpointing_metrics( + project, + dataset, + table, + dataclass_type, + run_id, + restore_time_p50=None, + restore_time_p90=None, + restore_time_p99=None, + restore_time_p100=None, + restore_time_max=None, + restore_time_min=None, + restore_time_avg=None, + restore_time_stddev=None, + restore_time_initial=None, + write_time_p50=None, + write_time_p90=None, + write_time_p99=None, + write_time_p100=None, + write_time_max=None, + write_time_min=None, + write_time_avg=None, + write_time_stddev=None, + accelerator_to_cpu_time_max=None, + accelerator_to_cpu_time_min=None, + accelerator_to_cpu_time_avg=None, + storage_save_time_max=None, + storage_save_time_min=None, + storage_save_time_avg=None, + num_restore_datapoints=None, + num_write_datapoints=None, + additional_metrics=None, + is_test=False, +): + + if bq_writer_utils.validate_id( + logger, + run_id, + "run_summary", + "run_id", + workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema, + is_test, + ): + + writer = bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + checkpointing_metrics_data = checkpointing_metrics_schema.CheckpointingMetricsInfo( + run_id=run_id, + restore_time_p50=restore_time_p50, + restore_time_p90=restore_time_p90, + restore_time_p99=restore_time_p99, + restore_time_p100=restore_time_p100, + restore_time_max=restore_time_max, + restore_time_min=restore_time_min, + restore_time_avg=restore_time_avg, + restore_time_stddev=restore_time_stddev, + restore_time_initial=restore_time_initial, + write_time_p50=write_time_p50, + write_time_p90=write_time_p90, + write_time_p99=write_time_p99, + write_time_p100=write_time_p100, + write_time_max=write_time_max, + write_time_min=write_time_min, + write_time_avg=write_time_avg, + write_time_stddev=write_time_stddev, + accelerator_to_cpu_time_max=accelerator_to_cpu_time_max, + accelerator_to_cpu_time_min=accelerator_to_cpu_time_min, + accelerator_to_cpu_time_avg=accelerator_to_cpu_time_avg, + storage_save_time_max=storage_save_time_max, + storage_save_time_min=storage_save_time_min, + storage_save_time_avg=storage_save_time_avg, + num_restore_datapoints=num_restore_datapoints, + num_write_datapoints=num_write_datapoints, + additional_metrics=json.loads(additional_metrics) if additional_metrics is not None else None, + ) + + logging.info("Writing Data %s to %s table.", checkpointing_metrics_data, table) + writer.write([checkpointing_metrics_data]) + + else: + raise ValueError("Could not upload data in run summary table") + + +def main(_: Sequence[str]): + write_checkpointing_metrics( + project=("supercomputer-testing" if common_flags.IS_TEST.value else "ml-workload-benchmarks"), + dataset=("mantaray_v2" if common_flags.IS_TEST.value else "benchmark_dataset_v2"), + table="checkpointing_metrics", + dataclass_type=checkpointing_metrics_schema.CheckpointingMetricsInfo, + run_id=common_flags.RUN_ID.value, + restore_time_p50=_RESTORE_TIME_P50.value, + restore_time_p90=_RESTORE_TIME_P90.value, + restore_time_p99=_RESTORE_TIME_P99.value, + restore_time_p100=_RESTORE_TIME_P100.value, + restore_time_max=_RESTORE_TIME_MAX.value, + restore_time_min=_RESTORE_TIME_MIN.value, + restore_time_avg=_RESTORE_TIME_AVG.value, + restore_time_initial=_RESTORE_TIME_INITIAL.value, + restore_time_stddev=_RESTORE_TIME_STDDEV.value, + write_time_p50=_WRITE_TIME_P50.value, + write_time_p90=_WRITE_TIME_P90.value, + write_time_p99=_WRITE_TIME_P99.value, + write_time_p100=_WRITE_TIME_P100.value, + write_time_max=_WRITE_TIME_MAX.value, + write_time_min=_WRITE_TIME_MIN.value, + write_time_avg=_WRITE_TIME_AVG.value, + write_time_stddev=_WRITE_TIME_STDDEV.value, + accelerator_to_cpu_time_max=_ACCELERATOR_TO_CPU_TIME_MAX.value, + accelerator_to_cpu_time_min=_ACCELERATOR_TO_CPU_TIME_MIN.value, + accelerator_to_cpu_time_avg=_ACCELERATOR_TO_CPU_TIME_AVG.value, + storage_save_time_max=_STORAGE_SAVE_TIME_MAX.value, + storage_save_time_min=_STORAGE_SAVE_TIME_MIN.value, + storage_save_time_avg=_STORAGE_SAVE_TIME_AVG.value, + num_restore_datapoints=_NUM_RESTORE_DATAPOINTS.value, + num_write_datapoints=_NUM_WRITE_DATAPOINTS.value, + additional_metrics=common_flags.ADDITIONAL_METRICS.value, + is_test=common_flags.IS_TEST.value, + ) + + +if __name__ == "__main__": + app.run(main) diff --git a/benchmarks/benchmark_db_writer/supplemental_metrics_writer/common_flags.py b/benchmarks/benchmark_db_writer/supplemental_metrics_writer/common_flags.py new file mode 100644 index 0000000000..844c266e1a --- /dev/null +++ b/benchmarks/benchmark_db_writer/supplemental_metrics_writer/common_flags.py @@ -0,0 +1,15 @@ +"""Common flags that are shared among metrics writers.""" + +from absl import flags + +RUN_ID = flags.DEFINE_string("run_id", None, "The ID of the benchmark run.") +IS_TEST = flags.DEFINE_bool( + "is_test", + False, + "True to write the metrics to the test project.", +) +ADDITIONAL_METRICS = flags.DEFINE_string( + "additional_metrics", + None, + "The ad-hoc metrics which are only needed by specific benchmarks.", +) diff --git a/benchmarks/benchmark_db_writer/supplemental_metrics_writer/data_loading_metrics_writer.py b/benchmarks/benchmark_db_writer/supplemental_metrics_writer/data_loading_metrics_writer.py new file mode 100644 index 0000000000..9b570ddd1b --- /dev/null +++ b/benchmarks/benchmark_db_writer/supplemental_metrics_writer/data_loading_metrics_writer.py @@ -0,0 +1,128 @@ +"""Update the data_loading_metrics table of the benchmark dataset.""" + +import logging +from typing import Sequence +from absl import app +from absl import flags +import json + +from benchmarks.benchmark_db_writer import bq_writer_utils +from benchmarks.benchmark_db_writer.supplemental_metrics_writer import common_flags +from benchmarks.benchmark_db_writer.schema.supplemental_metrics import ( + data_loading_metrics_schema, +) +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + workload_benchmark_v2_schema, +) + + +_DATA_LOADING_TOKENS_PER_SEC_P50 = flags.DEFINE_float( + "data_loading_tokens_per_sec_p50", + None, + "50 percentile of the egress throughput delivered by the storage system" " per step, in tokens per second.", +) +_DATA_LOADING_TOKENS_PER_SEC_P90 = flags.DEFINE_float( + "data_loading_tokens_per_sec_p90", + None, + "90 percentile of the egress throughput delivered by the storage system" " per step, in tokens per second.", +) +_DATA_LOADING_TOKENS_PER_SEC_P99 = flags.DEFINE_float( + "data_loading_tokens_per_sec_p99", + None, + "99 percentile of the egress throughput delivered by the storage system" " per step, in tokens per second.", +) +_DATA_LOADING_TOKENS_PER_SEC_P100 = flags.DEFINE_float( + "data_loading_tokens_per_sec_p100", + None, + "100 percentile of the egress throughput delivered by the storage system" " per step, in tokens per second.", +) +_ACCELERATOR_BLOCKED_TIME = flags.DEFINE_float( + "accelerator_blocked_time", + None, + "The duration an accelerator is unavailable for processing tasks due to " "data loading.", +) +_ACCELERATOR_BLOCKED_PERCENT = flags.DEFINE_float( + "accelerator_blocked_percent", + None, + "The percent of time an accelerator is unavailable for processing tasks " "due to data loading.", +) + + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def write_data_loading_metrics( + project, + dataset, + table, + dataclass_type, + run_id, + data_loading_tokens_per_sec_p50, + data_loading_tokens_per_sec_p90, + data_loading_tokens_per_sec_p99, + data_loading_tokens_per_sec_p100, + accelerator_blocked_time, + accelerator_blocked_percent, + additional_metrics, + is_test=False, +): + + if bq_writer_utils.validate_id( + logger, + run_id, + "run_summary", + "run_id", + workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema, + is_test, + ): + + writer = bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + data_loading_metrics_data = data_loading_metrics_schema.DataLoadingMetricsInfo( + run_id=run_id, + data_loading_tokens_per_sec_p50=data_loading_tokens_per_sec_p50, + data_loading_tokens_per_sec_p90=data_loading_tokens_per_sec_p90, + data_loading_tokens_per_sec_p99=data_loading_tokens_per_sec_p99, + data_loading_tokens_per_sec_p100=data_loading_tokens_per_sec_p100, + accelerator_blocked_time=accelerator_blocked_time, + accelerator_blocked_percent=accelerator_blocked_percent, + additional_metrics=json.loads(additional_metrics), + ) + + logging.info("Writing Data %s to %s table.", data_loading_metrics_data, table) + writer.write([data_loading_metrics_data]) + + else: + raise ValueError("Could not upload data in data_loading_metrics table.") + + +def main(_: Sequence[str]): + write_data_loading_metrics( + project=("supercomputer-testing" if common_flags.IS_TEST.value else "ml-workload-benchmarks"), + dataset=("mantaray_v2" if common_flags.IS_TEST.value else "benchmark_dataset_v2"), + table="data_loading_metrics", + dataclass_type=data_loading_metrics_schema.DataLoadingMetricsInfo, + run_id=common_flags.RUN_ID.value, + data_loading_tokens_per_sec_p50=_DATA_LOADING_TOKENS_PER_SEC_P50.value, + data_loading_tokens_per_sec_p90=_DATA_LOADING_TOKENS_PER_SEC_P90.value, + data_loading_tokens_per_sec_p99=_DATA_LOADING_TOKENS_PER_SEC_P99.value, + data_loading_tokens_per_sec_p100=_DATA_LOADING_TOKENS_PER_SEC_P100.value, + accelerator_blocked_time=_ACCELERATOR_BLOCKED_TIME.value, + accelerator_blocked_percent=_ACCELERATOR_BLOCKED_PERCENT.value, + additional_metrics=common_flags.ADDITIONAL_METRICS.value, + is_test=common_flags.IS_TEST.value, + ) + + +if __name__ == "__main__": + app.run(main) diff --git a/benchmarks/benchmark_db_writer/supplemental_metrics_writer/listing_metrics_writer.py b/benchmarks/benchmark_db_writer/supplemental_metrics_writer/listing_metrics_writer.py new file mode 100644 index 0000000000..e3e21021e1 --- /dev/null +++ b/benchmarks/benchmark_db_writer/supplemental_metrics_writer/listing_metrics_writer.py @@ -0,0 +1,314 @@ +""" +Update the listing_metrics table of the benchmark dataset. +""" + +import logging +from typing import Sequence + +from absl import app +from absl import flags +from benchmarks.benchmark_db_writer import bq_writer_utils +from benchmarks.benchmark_db_writer.schema.supplemental_metrics import listing_metrics_schema +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + workload_benchmark_v2_schema, +) +from benchmarks.benchmark_db_writer.supplemental_metrics_writer import common_flags + +_FIRST_ITERATION_METRICS_GCS_LIST_TIME_AVG = flags.DEFINE_float( + "first_iteration_metrics_gcs_list_time_avg", + None, + "The average time it takes to perform the first GCS listing, in seconds.", +) +_FIRST_ITERATION_METRICS_FUSE_LIST_TIME_AVG = flags.DEFINE_float( + "first_iteration_metrics_fuse_list_time_avg", + None, + "The average time it takes to perform the first GCS Fuse listing, in" " seconds.", +) +_FIRST_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_AVG = flags.DEFINE_float( + "first_iteration_metrics_gcs_subdir_list_time_avg", + None, + "The average time it takes to perform the first GCS subdirectory listing," " in seconds.", +) +_FIRST_ITERATION_METRICS_GCS_LIST_TIME_P50 = flags.DEFINE_float( + "first_iteration_metrics_gcs_list_time_p50", + None, + "50 percentile of the time it takes to perform the first GCS listing, in" " seconds.", +) +_FIRST_ITERATION_METRICS_FUSE_LIST_TIME_P50 = flags.DEFINE_float( + "first_iteration_metrics_fuse_list_time_p50", + None, + "50 percentile of the time it takes to perform the first GCS Fuse listing," " in seconds.", +) +_FIRST_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P50 = flags.DEFINE_float( + "first_iteration_metrics_gcs_subdir_list_time_p50", + None, + "50 percentile of the time it takes to perform the first GCS subdirectory" " listing, in seconds.", +) +_FIRST_ITERATION_METRICS_GCS_LIST_TIME_P90 = flags.DEFINE_float( + "first_iteration_metrics_gcs_list_time_p90", + None, + "90 percentile of the time it takes to perform the first GCS listing, in" " seconds.", +) +_FIRST_ITERATION_METRICS_FUSE_LIST_TIME_P90 = flags.DEFINE_float( + "first_iteration_metrics_fuse_list_time_p90", + None, + "90 percentile of the time it takes to perform the first GCS Fuse listing," " in seconds.", +) +_FIRST_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P90 = flags.DEFINE_float( + "first_iteration_metrics_gcs_subdir_list_time_p90", + None, + "90 percentile of the time it takes to perform the first GCS subdirectory" " listing, in seconds.", +) +_FIRST_ITERATION_METRICS_GCS_LIST_TIME_P99 = flags.DEFINE_float( + "first_iteration_metrics_gcs_list_time_p99", + None, + "99 percentile of the time it takes to perform the first GCS listing, in" " seconds.", +) +_FIRST_ITERATION_METRICS_FUSE_LIST_TIME_P99 = flags.DEFINE_float( + "first_iteration_metrics_fuse_list_time_p99", + None, + "99 percentile of the time it takes to perform the first GCS Fuse listing," " in seconds.", +) +_FIRST_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P99 = flags.DEFINE_float( + "first_iteration_metrics_gcs_subdir_list_time_p99", + None, + "99 percentile of the time it takes to perform the first GCS subdirectory" " listing, in seconds.", +) +_FIRST_ITERATION_METRICS_GCS_LIST_TIME_P100 = flags.DEFINE_float( + "first_iteration_metrics_gcs_list_time_p100", + None, + "100 percentile of the time it takes to perform the first GCS listing, in" " seconds.", +) +_FIRST_ITERATION_METRICS_FUSE_LIST_TIME_P100 = flags.DEFINE_float( + "first_iteration_metrics_fuse_list_time_p100", + None, + "100 percentile of the time it takes to perform the first GCS Fuse listing," " in seconds.", +) +_FIRST_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P100 = flags.DEFINE_float( + "first_iteration_metrics_gcs_subdir_list_time_p100", + None, + "100 percentile of the time it takes to perform the first GCS subdirectory" " listing, in seconds.", +) +_SUBSQ_ITERATION_METRICS_GCS_LIST_TIME_AVG = flags.DEFINE_float( + "subsq_iteration_metrics_gcs_list_time_avg", + None, + "The average time it takes to perform the subsequent GCS listing, in" " seconds.", +) +_SUBSQ_ITERATION_METRICS_FUSE_LIST_TIME_AVG = flags.DEFINE_float( + "subsq_iteration_metrics_fuse_list_time_avg", + None, + "The average time it takes to perform the subsequent GCS Fuse listing, in" " seconds.", +) +_SUBSQ_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_AVG = flags.DEFINE_float( + "subsq_iteration_metrics_gcs_subdir_list_time_avg", + None, + "The average time it takes to perform the subsequent GCS subdirectory" " listing, in seconds.", +) +_SUBSQ_ITERATION_METRICS_GCS_LIST_TIME_P50 = flags.DEFINE_float( + "subsq_iteration_metrics_gcs_list_time_p50", + None, + "50 percentile of the time it takes to perform the subsequent GCS listing," " in seconds.", +) +_SUBSQ_ITERATION_METRICS_FUSE_LIST_TIME_P50 = flags.DEFINE_float( + "subsq_iteration_metrics_fuse_list_time_p50", + None, + "50 percentile of the time it takes to perform the subsequent GCS Fuse" " listing, in seconds.", +) +_SUBSQ_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P50 = flags.DEFINE_float( + "subsq_iteration_metrics_gcs_subdir_list_time_p50", + None, + "50 percentile of the time it takes to perform the subsequent GCS" " subdirectory listing, in seconds.", +) +_SUBSQ_ITERATION_METRICS_GCS_LIST_TIME_P90 = flags.DEFINE_float( + "subsq_iteration_metrics_gcs_list_time_p90", + None, + "90 percentile of the time it takes to perform the subsequent GCS listing," " in seconds.", +) +_SUBSQ_ITERATION_METRICS_FUSE_LIST_TIME_P90 = flags.DEFINE_float( + "subsq_iteration_metrics_fuse_list_time_p90", + None, + "90 percentile of the time it takes to perform the subsequent GCS Fuse" " listing, in seconds.", +) +_SUBSQ_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P90 = flags.DEFINE_float( + "subsq_iteration_metrics_gcs_subdir_list_time_p90", + None, + "90 percentile of the time it takes to perform the subsequent GCS" " subdirectory listing, in seconds.", +) +_SUBSQ_ITERATION_METRICS_GCS_LIST_TIME_P99 = flags.DEFINE_float( + "subsq_iteration_metrics_gcs_list_time_p99", + None, + "99 percentile of the time it takes to perform the subsequent GCS listing," " in seconds.", +) +_SUBSQ_ITERATION_METRICS_FUSE_LIST_TIME_P99 = flags.DEFINE_float( + "subsq_iteration_metrics_fuse_list_time_p99", + None, + "99 percentile of the time it takes to perform the subsequent GCS Fuse" " listing, in seconds.", +) +_SUBSQ_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P99 = flags.DEFINE_float( + "subsq_iteration_metrics_gcs_subdir_list_time_p99", + None, + "99 percentile of the time it takes to perform the subsequent GCS" " subdirectory listing, in seconds.", +) +_SUBSQ_ITERATION_METRICS_GCS_LIST_TIME_P100 = flags.DEFINE_float( + "subsq_iteration_metrics_gcs_list_time_p100", + None, + "100 percentile of the time it takes to perform the subsequent GCS listing," " in seconds.", +) +_SUBSQ_ITERATION_METRICS_FUSE_LIST_TIME_P100 = flags.DEFINE_float( + "subsq_iteration_metrics_fuse_list_time_p100", + None, + "100 percentile of the time it takes to perform the subsequent GCS Fuse" " listing, in seconds.", +) +_SUBSQ_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P100 = flags.DEFINE_float( + "subsq_iteration_metrics_gcs_subdir_list_time_p100", + None, + "100 percentile of the time it takes to perform the subsequent GCS" " subdirectory listing, in seconds.", +) + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def write_listing_metrics( + project, + dataset, + table, + dataclass_type, + run_id, + first_iteration_metrics_gcs_list_time_avg, + first_iteration_metrics_fuse_list_time_avg, + first_iteration_metrics_gcs_subdir_list_time_avg, + first_iteration_metrics_gcs_list_time_p50, + first_iteration_metrics_fuse_list_time_p50, + first_iteration_metrics_gcs_subdir_list_time_p50, + first_iteration_metrics_gcs_list_time_p90, + first_iteration_metrics_fuse_list_time_p90, + first_iteration_metrics_gcs_subdir_list_time_p90, + first_iteration_metrics_gcs_list_time_p99, + first_iteration_metrics_fuse_list_time_p99, + first_iteration_metrics_gcs_subdir_list_time_p99, + first_iteration_metrics_gcs_list_time_p100, + first_iteration_metrics_fuse_list_time_p100, + first_iteration_metrics_gcs_subdir_list_time_p100, + subsq_iteration_metrics_gcs_list_time_avg, + subsq_iteration_metrics_fuse_list_time_avg, + subsq_iteration_metrics_gcs_subdir_list_time_avg, + subsq_iteration_metrics_gcs_list_time_p50, + subsq_iteration_metrics_fuse_list_time_p50, + subsq_iteration_metrics_gcs_subdir_list_time_p50, + subsq_iteration_metrics_gcs_list_time_p90, + subsq_iteration_metrics_fuse_list_time_p90, + subsq_iteration_metrics_gcs_subdir_list_time_p90, + subsq_iteration_metrics_gcs_list_time_p99, + subsq_iteration_metrics_fuse_list_time_p99, + subsq_iteration_metrics_gcs_subdir_list_time_p99, + subsq_iteration_metrics_gcs_list_time_p100, + subsq_iteration_metrics_fuse_list_time_p100, + subsq_iteration_metrics_gcs_subdir_list_time_p100, + is_test=False, +): + + if bq_writer_utils.validate_id( + logger, + run_id, + "run_summary", + "run_id", + workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema, + is_test, + ): + + writer = bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + listing_metrics_data = listing_metrics_schema.ListingMetricsInfo( + run_id=run_id, + first_iteration_metrics_gcs_list_time_avg=first_iteration_metrics_gcs_list_time_avg, + first_iteration_metrics_fuse_list_time_avg=first_iteration_metrics_fuse_list_time_avg, + first_iteration_metrics_gcs_subdir_list_time_avg=first_iteration_metrics_gcs_subdir_list_time_avg, + first_iteration_metrics_gcs_list_time_p50=first_iteration_metrics_gcs_list_time_p50, + first_iteration_metrics_fuse_list_time_p50=first_iteration_metrics_fuse_list_time_p50, + first_iteration_metrics_gcs_subdir_list_time_p50=first_iteration_metrics_gcs_subdir_list_time_p50, + first_iteration_metrics_gcs_list_time_p90=first_iteration_metrics_gcs_list_time_p90, + first_iteration_metrics_fuse_list_time_p90=first_iteration_metrics_fuse_list_time_p90, + first_iteration_metrics_gcs_subdir_list_time_p90=first_iteration_metrics_gcs_subdir_list_time_p90, + first_iteration_metrics_gcs_list_time_p99=first_iteration_metrics_gcs_list_time_p99, + first_iteration_metrics_fuse_list_time_p99=first_iteration_metrics_fuse_list_time_p99, + first_iteration_metrics_gcs_subdir_list_time_p99=first_iteration_metrics_gcs_subdir_list_time_p99, + first_iteration_metrics_gcs_list_time_p100=first_iteration_metrics_gcs_list_time_p100, + first_iteration_metrics_fuse_list_time_p100=first_iteration_metrics_fuse_list_time_p100, + first_iteration_metrics_gcs_subdir_list_time_p100=first_iteration_metrics_gcs_subdir_list_time_p100, + subsq_iteration_metrics_gcs_list_time_avg=subsq_iteration_metrics_gcs_list_time_avg, + subsq_iteration_metrics_fuse_list_time_avg=subsq_iteration_metrics_fuse_list_time_avg, + subsq_iteration_metrics_gcs_subdir_list_time_avg=subsq_iteration_metrics_gcs_subdir_list_time_avg, + subsq_iteration_metrics_gcs_list_time_p50=subsq_iteration_metrics_gcs_list_time_p50, + subsq_iteration_metrics_fuse_list_time_p50=subsq_iteration_metrics_fuse_list_time_p50, + subsq_iteration_metrics_gcs_subdir_list_time_p50=subsq_iteration_metrics_gcs_subdir_list_time_p50, + subsq_iteration_metrics_gcs_list_time_p90=subsq_iteration_metrics_gcs_list_time_p90, + subsq_iteration_metrics_fuse_list_time_p90=subsq_iteration_metrics_fuse_list_time_p90, + subsq_iteration_metrics_gcs_subdir_list_time_p90=subsq_iteration_metrics_gcs_subdir_list_time_p90, + subsq_iteration_metrics_gcs_list_time_p99=subsq_iteration_metrics_gcs_list_time_p99, + subsq_iteration_metrics_fuse_list_time_p99=subsq_iteration_metrics_fuse_list_time_p99, + subsq_iteration_metrics_gcs_subdir_list_time_p99=subsq_iteration_metrics_gcs_subdir_list_time_p99, + subsq_iteration_metrics_gcs_list_time_p100=subsq_iteration_metrics_gcs_list_time_p100, + subsq_iteration_metrics_fuse_list_time_p100=subsq_iteration_metrics_fuse_list_time_p100, + subsq_iteration_metrics_gcs_subdir_list_time_p100=subsq_iteration_metrics_gcs_subdir_list_time_p100, + ) + + logging.info("Writing Data %s to %s table.", listing_metrics_data, table) + writer.write([listing_metrics_data]) + + else: + raise ValueError("Could not upload data in run summary table") + + +def main(_: Sequence[str]): + write_listing_metrics( + project=("supercomputer-testing" if common_flags.IS_TEST.value else "ml-workload-benchmarks"), + dataset=("mantaray_v2" if common_flags.IS_TEST.value else "benchmark_dataset_v2"), + table="listing_metrics", + dataclass_type=listing_metrics_schema.ListingMetricsInfo, + run_id=common_flags.RUN_ID.value, + first_iteration_metrics_gcs_list_time_avg=_FIRST_ITERATION_METRICS_GCS_LIST_TIME_AVG.value, + first_iteration_metrics_fuse_list_time_avg=_FIRST_ITERATION_METRICS_FUSE_LIST_TIME_AVG.value, + first_iteration_metrics_gcs_subdir_list_time_avg=_FIRST_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_AVG.value, + first_iteration_metrics_gcs_list_time_p50=_FIRST_ITERATION_METRICS_GCS_LIST_TIME_P50.value, + first_iteration_metrics_fuse_list_time_p50=_FIRST_ITERATION_METRICS_FUSE_LIST_TIME_P50.value, + first_iteration_metrics_gcs_subdir_list_time_p50=_FIRST_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P50.value, + first_iteration_metrics_gcs_list_time_p90=_FIRST_ITERATION_METRICS_GCS_LIST_TIME_P90.value, + first_iteration_metrics_fuse_list_time_p90=_FIRST_ITERATION_METRICS_FUSE_LIST_TIME_P90.value, + first_iteration_metrics_gcs_subdir_list_time_p90=_FIRST_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P90.value, + first_iteration_metrics_gcs_list_time_p99=_FIRST_ITERATION_METRICS_GCS_LIST_TIME_P99.value, + first_iteration_metrics_fuse_list_time_p99=_FIRST_ITERATION_METRICS_FUSE_LIST_TIME_P99.value, + first_iteration_metrics_gcs_subdir_list_time_p99=_FIRST_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P99.value, + first_iteration_metrics_gcs_list_time_p100=_FIRST_ITERATION_METRICS_GCS_LIST_TIME_P100.value, + first_iteration_metrics_fuse_list_time_p100=_FIRST_ITERATION_METRICS_FUSE_LIST_TIME_P100.value, + first_iteration_metrics_gcs_subdir_list_time_p100=_FIRST_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P100.value, + subsq_iteration_metrics_gcs_list_time_avg=_SUBSQ_ITERATION_METRICS_GCS_LIST_TIME_AVG.value, + subsq_iteration_metrics_fuse_list_time_avg=_SUBSQ_ITERATION_METRICS_FUSE_LIST_TIME_AVG.value, + subsq_iteration_metrics_gcs_subdir_list_time_avg=_SUBSQ_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_AVG.value, + subsq_iteration_metrics_gcs_list_time_p50=_SUBSQ_ITERATION_METRICS_GCS_LIST_TIME_P50.value, + subsq_iteration_metrics_fuse_list_time_p50=_SUBSQ_ITERATION_METRICS_FUSE_LIST_TIME_P50.value, + subsq_iteration_metrics_gcs_subdir_list_time_p50=_SUBSQ_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P50.value, + subsq_iteration_metrics_gcs_list_time_p90=_SUBSQ_ITERATION_METRICS_GCS_LIST_TIME_P90.value, + subsq_iteration_metrics_fuse_list_time_p90=_SUBSQ_ITERATION_METRICS_FUSE_LIST_TIME_P90.value, + subsq_iteration_metrics_gcs_subdir_list_time_p90=_SUBSQ_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P90.value, + subsq_iteration_metrics_gcs_list_time_p99=_SUBSQ_ITERATION_METRICS_GCS_LIST_TIME_P99.value, + subsq_iteration_metrics_fuse_list_time_p99=_SUBSQ_ITERATION_METRICS_FUSE_LIST_TIME_P99.value, + subsq_iteration_metrics_gcs_subdir_list_time_p99=_SUBSQ_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P99.value, + subsq_iteration_metrics_gcs_list_time_p100=_SUBSQ_ITERATION_METRICS_GCS_LIST_TIME_P100.value, + subsq_iteration_metrics_fuse_list_time_p100=_SUBSQ_ITERATION_METRICS_FUSE_LIST_TIME_P100.value, + subsq_iteration_metrics_gcs_subdir_list_time_p100=_SUBSQ_ITERATION_METRICS_GCS_SUBDIR_LIST_TIME_P100.value, + is_test=common_flags.IS_TEST.value, + ) + + +if __name__ == "__main__": + app.run(main) diff --git a/benchmarks/benchmark_db_writer/supplemental_metrics_writer/network_metrics_writer.py b/benchmarks/benchmark_db_writer/supplemental_metrics_writer/network_metrics_writer.py new file mode 100644 index 0000000000..bfec50cdf3 --- /dev/null +++ b/benchmarks/benchmark_db_writer/supplemental_metrics_writer/network_metrics_writer.py @@ -0,0 +1,170 @@ +""" +Update the network_metrics table of the benchmark dataset. +""" + +import logging +from typing import Sequence + +from absl import app +from absl import flags +from benchmarks.benchmark_db_writer import bq_writer_utils +from benchmarks.benchmark_db_writer.schema.supplemental_metrics import network_metrics_schema +from benchmarks.benchmark_db_writer.schema.workload_benchmark_v2 import ( + workload_benchmark_v2_schema, +) +from benchmarks.benchmark_db_writer.supplemental_metrics_writer import common_flags + +_SERVER_MAX_EGRESS = flags.DEFINE_float( + "server_max_egress", + None, + "The peak value of the storage egress throughput from the server side.", +) +_SERVER_AVG_EGRESS = flags.DEFINE_float( + "server_avg_egress", + None, + "The average value of the storage egress throughput from the server side.", +) +_SERVER_MAX_INGRESS = flags.DEFINE_float( + "server_max_ingress", + None, + "The peak value of the storage ingress throughput from the server side.", +) +_SERVER_AVG_INGRESS = flags.DEFINE_float( + "server_avg_ingress", + None, + "The average value of the storage ingress throughput from the server side.", +) +_SERVER_MAX_QPS = flags.DEFINE_float( + "server_max_qps", + None, + "The peak value of the storage QPS from the server side.", +) +_SERVER_AVG_QPS = flags.DEFINE_float( + "server_avg_qps", + None, + "The average value of the storage QPS throughput from the server side.", +) +_CLIENT_MAX_EGRESS = flags.DEFINE_float( + "client_max_egress", + None, + "The peak value of the storage egress throughput from the client side.", +) +_CLIENT_AVG_EGRESS = flags.DEFINE_float( + "client_avg_egress", + None, + "The average value of the storage egress throughput from the client side.", +) +_CLIENT_MAX_INGRESS = flags.DEFINE_float( + "client_max_ingress", + None, + "The peak value of the storage ingress throughput from the client side.", +) +_CLIENT_AVG_INGRESS = flags.DEFINE_float( + "client_avg_ingress", + None, + "The average value of the storage ingress throughput from the client side.", +) +_CLIENT_MAX_QPS = flags.DEFINE_float( + "client_max_qps", + None, + "The peak value of the storage QPS from the client side.", +) +_CLIENT_AVG_QPS = flags.DEFINE_float( + "client_avg_qps", + None, + "The average value of the storage QPS from the client side.", +) + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def write_network_metrics( + project, + dataset, + table, + dataclass_type, + run_id, + server_max_egress, + server_avg_egress, + server_max_ingress, + server_avg_ingress, + server_max_qps, + server_avg_qps, + client_max_egress, + client_avg_egress, + client_max_ingress, + client_avg_ingress, + client_max_qps, + client_avg_qps, + is_test=False, +): + + if bq_writer_utils.validate_id( + logger, + run_id, + "run_summary", + "run_id", + workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema, + is_test, + ): + + writer = bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + network_metrics_data = network_metrics_schema.NetworkMetricsInfo( + run_id=run_id, + server_max_egress=server_max_egress, + server_avg_egress=server_avg_egress, + server_max_ingress=server_max_ingress, + server_avg_ingress=server_avg_ingress, + server_max_qps=server_max_qps, + server_avg_qps=server_avg_qps, + client_max_egress=client_max_egress, + client_avg_egress=client_avg_egress, + client_max_ingress=client_max_ingress, + client_avg_ingress=client_avg_ingress, + client_max_qps=client_max_qps, + client_avg_qps=client_avg_qps, + ) + + logging.info("Writing Data %s to %s table.", network_metrics_data, table) + writer.write([network_metrics_data]) + + else: + raise ValueError("Could not upload data in run summary table") + + +def main(_: Sequence[str]): + write_network_metrics( + project=("supercomputer-testing" if common_flags.IS_TEST.value else "ml-workload-benchmarks"), + dataset=("mantaray_v2" if common_flags.IS_TEST.value else "benchmark_dataset_v2"), + table="network_metrics", + dataclass_type=network_metrics_schema.NetworkMetricsInfo, + run_id=common_flags.RUN_ID.value, + server_max_egress=_SERVER_MAX_EGRESS.value, + server_avg_egress=_SERVER_AVG_EGRESS.value, + server_max_ingress=_SERVER_MAX_INGRESS.value, + server_avg_ingress=_SERVER_AVG_INGRESS.value, + server_max_qps=_SERVER_MAX_QPS.value, + server_avg_qps=_SERVER_AVG_QPS.value, + client_max_egress=_CLIENT_MAX_EGRESS.value, + client_avg_egress=_CLIENT_AVG_EGRESS.value, + client_max_ingress=_CLIENT_MAX_INGRESS.value, + client_avg_ingress=_CLIENT_AVG_INGRESS.value, + client_max_qps=_CLIENT_MAX_QPS.value, + client_avg_qps=_CLIENT_AVG_QPS.value, + is_test=common_flags.IS_TEST.value, + ) + + +if __name__ == "__main__": + app.run(main) diff --git a/benchmarks/benchmark_db_writer/tests/test_supplement_metrics/test_data_loading_metrics_writer.py b/benchmarks/benchmark_db_writer/tests/test_supplement_metrics/test_data_loading_metrics_writer.py new file mode 100644 index 0000000000..a3ff31a344 --- /dev/null +++ b/benchmarks/benchmark_db_writer/tests/test_supplement_metrics/test_data_loading_metrics_writer.py @@ -0,0 +1,55 @@ +"""Tests for data_loading_metrics_writer.py.""" + +from unittest import mock +from benchmarks.benchmark_db_writer.schema.supplemental_metrics import data_loading_metrics_schema +from benchmarks.benchmark_db_writer.supplemental_metrics_writer import data_loading_metrics_writer +import pytest + + +@mock.patch("benchmark_db_writer.bq_writer_utils.validate_id") +def test_write_data_loading_metrics_success(mock_validate_id): + """Test write_data_loading_metrics valid case.""" + + mock_validate_id.return_value = True + try: + data_loading_metrics_writer.write_data_loading_metrics( + project="supercomputer-testing", + dataset="mantaray_v2", + table="data_loading_metrics", + dataclass_type=data_loading_metrics_schema.DataLoadingMetricsInfo, + run_id="run_id", + data_loading_tokens_per_sec_p50=50.0, + data_loading_tokens_per_sec_p90=90.0, + data_loading_tokens_per_sec_p99=99.0, + data_loading_tokens_per_sec_p100=100.0, + accelerator_blocked_time=55, + accelerator_blocked_percent=12.2, + additional_metrics='{"other_metrics":1}', + is_test=True, + ) + except Exception as e: + pytest.fail(f"data_loading_metrics_writer() raised unexpected error: {str(e)}") + + +@mock.patch("benchmark_db_writer.bq_writer_utils.validate_id") +def test_write_data_loading_metrics_invalid_run_id(mock_validate_id): + """Test write_data_loading_metrics invalid run_id case.""" + + mock_validate_id.return_value = False + with pytest.raises(Exception) as err_info: + data_loading_metrics_writer.write_data_loading_metrics( + project="supercomputer-testing", + dataset="mantaray_v2", + table="data_loading_metrics", + dataclass_type=data_loading_metrics_schema.DataLoadingMetricsInfo, + run_id="run_id", + data_loading_tokens_per_sec_p50=50.0, + data_loading_tokens_per_sec_p90=90.0, + data_loading_tokens_per_sec_p99=99.0, + data_loading_tokens_per_sec_p100=100.0, + accelerator_blocked_time=55, + accelerator_blocked_percent=12.2, + additional_metrics='{"other_metrics":1}', + is_test=True, + ) + assert str(err_info.value) == "Could not upload data in data_loading_metrics table." diff --git a/benchmarks/globals.py b/benchmarks/globals.py index ba3a625b72..90528eadf4 100644 --- a/benchmarks/globals.py +++ b/benchmarks/globals.py @@ -17,7 +17,7 @@ import os.path # This is the MaxText root: with "max_utils.py"; &etc. TODO: Replace `os.path.basename` with `os.path.abspath` -MAXTEXT_PKG_DIR = os.environ.get("MAXTEXT_PKG_DIR", "MaxText") +MAXTEXT_PKG_DIR = os.environ.get("MAXTEXT_PKG_DIR", "src/MaxText") # This is the maxtext repo root: with ".git" folder; "README.md"; "pyproject.toml"; &etc. MAXTEXT_REPO_ROOT = os.environ.get( diff --git a/benchmarks/maxtext_xpk_runner.py b/benchmarks/maxtext_xpk_runner.py index d968d04908..bd1c2ec815 100644 --- a/benchmarks/maxtext_xpk_runner.py +++ b/benchmarks/maxtext_xpk_runner.py @@ -158,6 +158,9 @@ def __post_init__(self): else: self.num_devices_per_slice = int(self.device_type.split("-")[1]) / 2 self.topology = "" + self.hardware_id = self.device_type.split("-")[0] + if self.hardware_id == "v5litepod": + self.hardware_id = "v5e" def wait_for_xpk_workload_completion(cluster_config: XpkClusterConfig, workload_name, xpk_path) -> int: @@ -341,6 +344,7 @@ def _build_args_from_config(wl_config: WorkloadConfig) -> dict: "model_id": wl_config.model.model_type, "hardware_id": wl_config.hardware_id, "software_id": "jax_maxtext", + "hardware_num_slices": wl_config.num_slices, "number_of_chips": wl_config.num_devices_per_slice * wl_config.num_slices, "container_image_name": wl_config.base_docker_image, "global_batch_size": per_device_batch_size * wl_config.num_devices_per_slice * wl_config.num_slices, @@ -445,7 +449,8 @@ def build_user_command( f"base_output_directory={wl_config.base_output_directory}", f"{vertex_tensorboard}", f"{run_name_command}", - f"{enable_metrics_cmd}" f"{upload_hlo_dump}", + f"{enable_metrics_cmd}", + f"{upload_hlo_dump}", ] ) return command diff --git a/benchmarks/recipes/runner_utils.py b/benchmarks/recipes/runner_utils.py index 031158b595..ecd1dbbe5b 100644 --- a/benchmarks/recipes/runner_utils.py +++ b/benchmarks/recipes/runner_utils.py @@ -34,6 +34,9 @@ def generate_and_run_workloads(user_config, num_slices_list, num_steps, priority num_slices_list: A list of the number of slices to be executed. num_steps: The number of steps for each workload. """ + if user_config.bq_enable and (not user_config.bq_db_project or not user_config.bq_db_dataset): + logging.error("Validation FAILED: BQ is enabled, but project or dataset is missing.") + return 1 xpk_workload_cmds = [] xpk_workload_names = [] @@ -65,6 +68,9 @@ def generate_and_run_workloads(user_config, num_slices_list, num_steps, priority xpk_path=user_config.xpk_path, num_steps=num_steps, priority=priority, + generate_metrics_and_upload_to_big_query=user_config.bq_enable, + db_project=user_config.bq_db_project, + db_dataset=user_config.bq_db_dataset, ) # Generate XPK command diff --git a/benchmarks/recipes/user_configs.py b/benchmarks/recipes/user_configs.py index 6c01c0dfc7..f50e912573 100644 --- a/benchmarks/recipes/user_configs.py +++ b/benchmarks/recipes/user_configs.py @@ -70,6 +70,11 @@ class UserConfig: selected_model_names: list[str] = dataclasses.field(default_factory=lambda: ["llama3_1_8b_8192"]) num_slices_list: list[int] = dataclasses.field(default_factory=lambda: [2]) + # BigQuery configuration + bq_enable: bool = False + bq_db_project: str = "" + bq_db_dataset: str = "" + # other configuration xpk_path: str = "~/xpk" max_restarts: int = 0 diff --git a/benchmarks/upload_metrics_to_bq.py b/benchmarks/upload_metrics_to_bq.py index 8576f3cc0e..6caea26abb 100644 --- a/benchmarks/upload_metrics_to_bq.py +++ b/benchmarks/upload_metrics_to_bq.py @@ -186,6 +186,12 @@ def add_parser_arguments(parser: argparse.ArgumentParser): default=True, help="Whether to use the testing project or production project", ) + parser.add_argument( + "--hardware_num_slices", + type=int, + required=False, + help="hardware slice number", + ) def download_metrics_file_locally(metrics_gcs_file: str, local_file: str) -> int: diff --git a/requirements.txt b/requirements.txt index cf3bf2d0d3..a450d73cc2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ aqtp array-record cloud-accelerator-diagnostics cloud-tpu-diagnostics +dacite datasets flax gcsfs @@ -10,6 +11,7 @@ google-api-python-client google-cloud-aiplatform google-cloud-monitoring grain[parquet] +gspread huggingface_hub jax!=0.7.1 jaxlib!=0.7.1