diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index b7412346bd..cbda9bc640 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -15,6 +15,7 @@ from __future__ import annotations import math +import os import threading from typing import Literal, Mapping, Optional, Sequence, Tuple import warnings @@ -58,7 +59,7 @@ MAX_SUBTREE_FACTORINGS = 5 _MAX_CLUSTER_COLUMNS = 4 MAX_SMALL_RESULT_BYTES = 10 * 1024 * 1024 * 1024 # 10G - +_MAX_READ_STREAMS = os.cpu_count() SourceIdMapping = Mapping[str, str] @@ -323,7 +324,10 @@ def _export_gbq( self.bqclient.update_table(table, ["schema"]) return executor.ExecuteResult( - row_iter.to_arrow_iterable(), + row_iter.to_arrow_iterable( + bqstorage_client=self.bqstoragereadclient, + max_stream_count=_MAX_READ_STREAMS, + ), array_value.schema, query_job, total_bytes_processed=row_iter.total_bytes_processed, @@ -668,7 +672,8 @@ def _execute_plan_gbq( return executor.ExecuteResult( _arrow_batches=iterator.to_arrow_iterable( - bqstorage_client=self.bqstoragereadclient + bqstorage_client=self.bqstoragereadclient, + max_stream_count=_MAX_READ_STREAMS, ), schema=og_schema, query_job=query_job,