From 568907c3876b10dc104de5d19a973135b2638d62 Mon Sep 17 00:00:00 2001 From: Matthew Tang Date: Thu, 9 Nov 2023 11:36:41 -0800 Subject: [PATCH] feat: Expose max_retry_cnt parameter for Ray on Vertex BigQuery write PiperOrigin-RevId: 580975522 --- .../preview/vertex_ray/bigquery_datasource.py | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/google/cloud/aiplatform/preview/vertex_ray/bigquery_datasource.py b/google/cloud/aiplatform/preview/vertex_ray/bigquery_datasource.py index 01c4584425..213b12d483 100644 --- a/google/cloud/aiplatform/preview/vertex_ray/bigquery_datasource.py +++ b/google/cloud/aiplatform/preview/vertex_ray/bigquery_datasource.py @@ -50,7 +50,8 @@ gapic_version=_BQS_GAPIC_VERSION, user_agent=f"ray-on-vertex/{_BQS_GAPIC_VERSION}" ) -MAX_RETRY_CNT = 20 +DEFAULT_MAX_RETRY_CNT = 10 +RATE_LIMIT_EXCEEDED_SLEEP_TIME = 11 class _BigQueryDatasourceReader(Reader): @@ -177,6 +178,7 @@ def do_write( ray_remote_args: Optional[Dict[str, Any]], project_id: Optional[str] = None, dataset: Optional[str] = None, + max_retry_cnt: Optional[int] = DEFAULT_MAX_RETRY_CNT, ) -> List[ObjectRef[WriteResult]]: def _write_single_block( block: Block, metadata: BlockMetadata, project_id: str, dataset: str @@ -194,27 +196,37 @@ def _write_single_block( pq.write_table(block, fp, compression="SNAPPY") retry_cnt = 0 - while retry_cnt < MAX_RETRY_CNT: + while retry_cnt <= max_retry_cnt: with open(fp, "rb") as source_file: job = client.load_table_from_file( source_file, dataset, job_config=job_config ) - retry_cnt += 1 try: logging.info(job.result()) break except exceptions.Forbidden as e: + retry_cnt += 1 + if retry_cnt > max_retry_cnt: + break print( - "[Ray on Vertex AI]: Rate limit exceeded... Sleeping to try again" + "[Ray on Vertex AI]: A block write encountered" + + f" a rate limit exceeded error {retry_cnt} time(s)." + + " Sleeping to try again." ) logging.debug(e) - time.sleep(11) - - # Raise exception if retry_cnt hits MAX_RETRY_CNT - if retry_cnt >= MAX_RETRY_CNT: + time.sleep(RATE_LIMIT_EXCEEDED_SLEEP_TIME) + + # Raise exception if retry_cnt exceeds MAX_RETRY_CNT + if retry_cnt > max_retry_cnt: + print( + f"[Ray on Vertex AI]: Maximum ({max_retry_cnt}) retry count exceeded." + + " Ray will attempt to retry the block write via fault tolerance." + + " For more information, see https://docs.ray.io/en/latest/ray-core/fault_tolerance/tasks.html" + ) raise RuntimeError( - f"[Ray on Vertex AI]: Write failed due to {MAX_RETRY_CNT}" - + " repeated API rate limit exceeded responses" + f"[Ray on Vertex AI]: Write failed due to {retry_cnt}" + + " repeated API rate limit exceeded responses. Consider" + + " specifiying the max_retry_cnt kwarg with a higher value." ) print("[Ray on Vertex AI]: Finished writing", metadata.num_rows, "rows")