Skip to content

Commit

Permalink
feat: Expose max_retry_cnt parameter for Ray on Vertex BigQuery write
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 580975522
  • Loading branch information
matthew29tang authored and Copybara-Service committed Nov 9, 2023
1 parent 1a9e36f commit 568907c
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions google/cloud/aiplatform/preview/vertex_ray/bigquery_datasource.py
Expand Up @@ -50,7 +50,8 @@
gapic_version=_BQS_GAPIC_VERSION, user_agent=f"ray-on-vertex/{_BQS_GAPIC_VERSION}"
)

MAX_RETRY_CNT = 20
DEFAULT_MAX_RETRY_CNT = 10
RATE_LIMIT_EXCEEDED_SLEEP_TIME = 11


class _BigQueryDatasourceReader(Reader):
Expand Down Expand Up @@ -177,6 +178,7 @@ def do_write(
ray_remote_args: Optional[Dict[str, Any]],
project_id: Optional[str] = None,
dataset: Optional[str] = None,
max_retry_cnt: Optional[int] = DEFAULT_MAX_RETRY_CNT,
) -> List[ObjectRef[WriteResult]]:
def _write_single_block(
block: Block, metadata: BlockMetadata, project_id: str, dataset: str
Expand All @@ -194,27 +196,37 @@ def _write_single_block(
pq.write_table(block, fp, compression="SNAPPY")

retry_cnt = 0
while retry_cnt < MAX_RETRY_CNT:
while retry_cnt <= max_retry_cnt:
with open(fp, "rb") as source_file:
job = client.load_table_from_file(
source_file, dataset, job_config=job_config
)
retry_cnt += 1
try:
logging.info(job.result())
break
except exceptions.Forbidden as e:
retry_cnt += 1
if retry_cnt > max_retry_cnt:
break
print(
"[Ray on Vertex AI]: Rate limit exceeded... Sleeping to try again"
"[Ray on Vertex AI]: A block write encountered"
+ f" a rate limit exceeded error {retry_cnt} time(s)."
+ " Sleeping to try again."
)
logging.debug(e)
time.sleep(11)

# Raise exception if retry_cnt hits MAX_RETRY_CNT
if retry_cnt >= MAX_RETRY_CNT:
time.sleep(RATE_LIMIT_EXCEEDED_SLEEP_TIME)

# Raise exception if retry_cnt exceeds MAX_RETRY_CNT
if retry_cnt > max_retry_cnt:
print(
f"[Ray on Vertex AI]: Maximum ({max_retry_cnt}) retry count exceeded."
+ " Ray will attempt to retry the block write via fault tolerance."
+ " For more information, see https://docs.ray.io/en/latest/ray-core/fault_tolerance/tasks.html"
)
raise RuntimeError(
f"[Ray on Vertex AI]: Write failed due to {MAX_RETRY_CNT}"
+ " repeated API rate limit exceeded responses"
f"[Ray on Vertex AI]: Write failed due to {retry_cnt}"
+ " repeated API rate limit exceeded responses. Consider"
+ " specifiying the max_retry_cnt kwarg with a higher value."
)

print("[Ray on Vertex AI]: Finished writing", metadata.num_rows, "rows")
Expand Down

0 comments on commit 568907c

Please sign in to comment.