Skip to content

Commit

Permalink
feat: Add option to not overwrite table in Ray on Vertex BQ Write
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 598997529
  • Loading branch information
matthew29tang authored and Copybara-Service committed Jan 17, 2024
1 parent c708f87 commit a99e992
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions google/cloud/aiplatform/preview/vertex_ray/bigquery_datasource.py
Expand Up @@ -179,6 +179,7 @@ def do_write(
project_id: Optional[str] = None,
dataset: Optional[str] = None,
max_retry_cnt: Optional[int] = DEFAULT_MAX_RETRY_CNT,
overwrite_table: Optional[bool] = True,
) -> List[ObjectRef[WriteResult]]:
def _write_single_block(
block: Block, metadata: BlockMetadata, project_id: str, dataset: str
Expand Down Expand Up @@ -251,16 +252,22 @@ def _write_single_block(
dataset_id = dataset.split(".", 1)[0]
try:
client.get_dataset(dataset_id)
print(
f"[Ray on Vertex AI]: Dataset {dataset_id} already exists."
+ "The table will be overwritten if it already exists."
)
except exceptions.NotFound:
client.create_dataset(f"{project_id}.{dataset_id}", timeout=30)
print(f"[Ray on Vertex AI]: Created dataset {dataset_id}")

# Delete table if it already exists
client.delete_table(f"{project_id}.{dataset}", not_found_ok=True)
# Delete table if overwrite_table is True
if overwrite_table:
print(
f"[Ray on Vertex AI]: Attempting to delete table {dataset}"
+ " if it already exists since kwarg overwrite_table = True."
)
client.delete_table(f"{project_id}.{dataset}", not_found_ok=True)
else:
print(
f"[Ray on Vertex AI]: The write will append to table {dataset}"
+ " if it already exists since kwarg overwrite_table = False."
)

print("[Ray on Vertex AI]: Writing", len(blocks), "blocks")
for i in range(len(blocks)):
Expand Down

0 comments on commit a99e992

Please sign in to comment.