diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index f8a379aee9..465fa08187 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -28,6 +28,7 @@ import google.cloud.bigquery as bigquery import google.cloud.bigquery.table +import bigframes.core import bigframes.core.events import bigframes.exceptions as bfe import bigframes.session._io.bigquery @@ -37,18 +38,79 @@ import bigframes.session +def _convert_information_schema_table_id_to_table_reference( + table_id: str, + default_project: Optional[str], +) -> bigquery.TableReference: + """Squeeze an INFORMATION_SCHEMA reference into a TableReference. + This is kind-of a hack. INFORMATION_SCHEMA is a view that isn't available + via the tables.get REST API. + """ + parts = table_id.split(".") + parts_casefold = [part.casefold() for part in parts] + dataset_index = parts_casefold.index("INFORMATION_SCHEMA".casefold()) + + if dataset_index == 0: + project = default_project + else: + project = ".".join(parts[:dataset_index]) + + if project is None: + message = ( + "Could not determine project ID. " + "Please provide a project or region in your INFORMATION_SCHEMA table ID, " + "For example, 'region-REGION_NAME.INFORMATION_SCHEMA.JOBS'." + ) + raise ValueError(message) + + dataset = "INFORMATION_SCHEMA" + table_id_short = ".".join(parts[dataset_index + 1 :]) + return bigquery.TableReference( + bigquery.DatasetReference(project, dataset), + table_id_short, + ) + + +def get_information_schema_metadata( + bqclient: bigquery.Client, + table_id: str, + default_project: Optional[str], +) -> bigquery.Table: + job_config = bigquery.QueryJobConfig(dry_run=True) + job = bqclient.query( + f"SELECT * FROM `{table_id}`", + job_config=job_config, + ) + table_ref = _convert_information_schema_table_id_to_table_reference( + table_id=table_id, + default_project=default_project, + ) + table = bigquery.Table.from_api_repr( + { + "tableReference": table_ref.to_api_repr(), + "location": job.location, + # Prevent ourselves from trying to read the table with the BQ + # Storage API. + "type": "VIEW", + } + ) + table.schema = job.schema + return table + + def get_table_metadata( bqclient: bigquery.Client, - table_ref: google.cloud.bigquery.table.TableReference, - bq_time: datetime.datetime, *, - cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], + table_id: str, + default_project: Optional[str], + bq_time: datetime.datetime, + cache: Dict[str, Tuple[datetime.datetime, bigquery.Table]], use_cache: bool = True, publisher: bigframes.core.events.Publisher, ) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]: """Get the table metadata, either from cache or via REST API.""" - cached_table = cache.get(table_ref) + cached_table = cache.get(table_id) if use_cache and cached_table is not None: snapshot_timestamp, table = cached_table @@ -90,7 +152,16 @@ def get_table_metadata( return cached_table - table = bqclient.get_table(table_ref) + if is_information_schema(table_id): + table = get_information_schema_metadata( + bqclient=bqclient, table_id=table_id, default_project=default_project + ) + else: + table_ref = google.cloud.bigquery.table.TableReference.from_string( + table_id, default_project=default_project + ) + table = bqclient.get_table(table_ref) + # local time will lag a little bit do to network latency # make sure it is at least table creation time. # This is relevant if the table was created immediately before loading it here. @@ -98,10 +169,21 @@ def get_table_metadata( bq_time = table.created cached_table = (bq_time, table) - cache[table_ref] = cached_table + cache[table_id] = cached_table return cached_table +def is_information_schema(table_id: str): + table_id_casefold = table_id.casefold() + # Include the "."s to ensure we don't have false positives for some user + # defined dataset like MY_INFORMATION_SCHEMA or tables called + # INFORMATION_SCHEMA. + return ( + ".INFORMATION_SCHEMA.".casefold() in table_id_casefold + or table_id_casefold.startswith("INFORMATION_SCHEMA.".casefold()) + ) + + def is_time_travel_eligible( bqclient: bigquery.Client, table: google.cloud.bigquery.table.Table, @@ -168,6 +250,8 @@ def is_time_travel_eligible( msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel ) return False + elif table.table_type == "VIEW": + return False # table might support time travel, lets do a dry-run query with time travel if should_dry_run: diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 940fdc1352..2d5dec13e6 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -47,6 +47,8 @@ import pandas import pyarrow as pa +import bigframes._tools +import bigframes._tools.strings from bigframes.core import guid, identifiers, local_data, nodes, ordering, utils import bigframes.core as core import bigframes.core.blocks as blocks @@ -272,9 +274,7 @@ def __init__( self._default_index_type = default_index_type self._scan_index_uniqueness = scan_index_uniqueness self._force_total_order = force_total_order - self._df_snapshot: Dict[ - bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table] - ] = {} + self._df_snapshot: Dict[str, Tuple[datetime.datetime, bigquery.Table]] = {} self._metrics = metrics self._publisher = publisher # Unfortunate circular reference, but need to pass reference when constructing objects @@ -629,10 +629,6 @@ def read_gbq_table( _check_duplicates("columns", columns) - table_ref = google.cloud.bigquery.table.TableReference.from_string( - table_id, default_project=self._bqclient.project - ) - columns = list(columns) include_all_columns = columns is None or len(columns) == 0 filters = typing.cast(list, list(filters)) @@ -643,7 +639,8 @@ def read_gbq_table( time_travel_timestamp, table = bf_read_gbq_table.get_table_metadata( self._bqclient, - table_ref=table_ref, + table_id=table_id, + default_project=self._bqclient.project, bq_time=self._clock.get_time(), cache=self._df_snapshot, use_cache=use_cache, @@ -706,18 +703,23 @@ def read_gbq_table( # Optionally, execute the query # ----------------------------- - # max_results introduces non-determinism and limits the cost on - # clustered tables, so fallback to a query. We do this here so that - # the index is consistent with tables that have primary keys, even - # when max_results is set. - if max_results is not None: + if ( + # max_results introduces non-determinism and limits the cost on + # clustered tables, so fallback to a query. We do this here so that + # the index is consistent with tables that have primary keys, even + # when max_results is set. + max_results is not None + # Views such as INFORMATION_SCHEMA can introduce non-determinism. + # They can update frequently and don't support time travel. + or bf_read_gbq_table.is_information_schema(table_id) + ): # TODO(b/338111344): If we are running a query anyway, we might as # well generate ROW_NUMBER() at the same time. all_columns: Iterable[str] = ( itertools.chain(index_cols, columns) if columns else () ) query = bf_io_bigquery.to_query( - table_id, + f"{table.project}.{table.dataset_id}.{table.table_id}", columns=all_columns, sql_predicate=bf_io_bigquery.compile_filters(filters) if filters diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index 2530a1dc8d..136c279c08 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -46,6 +46,9 @@ def execute( if node.explicitly_ordered and ordered: return None + if not node.source.table.is_physically_stored: + return None + if limit is not None: if peek is None or limit < peek: peek = limit diff --git a/tests/system/small/pandas/test_read_gbq_information_schema.py b/tests/system/small/pandas/test_read_gbq_information_schema.py new file mode 100644 index 0000000000..32e2dc4712 --- /dev/null +++ b/tests/system/small/pandas/test_read_gbq_information_schema.py @@ -0,0 +1,50 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + + +@pytest.mark.parametrize("include_project", [True, False]) +@pytest.mark.parametrize( + "view_id", + [ + # https://cloud.google.com/bigquery/docs/information-schema-intro + "region-US.INFORMATION_SCHEMA.SESSIONS_BY_USER", + "region-US.INFORMATION_SCHEMA.SCHEMATA", + ], +) +def test_read_gbq_jobs_by_user_returns_schema( + unordered_session, view_id: str, include_project: bool +): + if include_project: + table_id = unordered_session.bqclient.project + "." + view_id + else: + table_id = view_id + + df = unordered_session.read_gbq(table_id, max_results=10) + assert df.dtypes is not None + + +def test_read_gbq_schemata_can_be_peeked(unordered_session): + df = unordered_session.read_gbq("region-US.INFORMATION_SCHEMA.SCHEMATA") + result = df.peek() + assert result is not None + + +def test_read_gbq_schemata_four_parts_can_be_peeked(unordered_session): + df = unordered_session.read_gbq( + f"{unordered_session.bqclient.project}.region-US.INFORMATION_SCHEMA.SCHEMATA" + ) + result = df.peek() + assert result is not None diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index d05957b941..f003398706 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -242,7 +242,7 @@ def test_read_gbq_cached_table(): table._properties["numRows"] = "1000000000" table._properties["location"] = session._location table._properties["type"] = "TABLE" - session._loader._df_snapshot[table_ref] = ( + session._loader._df_snapshot[str(table_ref)] = ( datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc), table, ) @@ -273,7 +273,7 @@ def test_read_gbq_cached_table_doesnt_warn_for_anonymous_tables_and_doesnt_inclu table._properties["numRows"] = "1000000000" table._properties["location"] = session._location table._properties["type"] = "TABLE" - session._loader._df_snapshot[table_ref] = ( + session._loader._df_snapshot[str(table_ref)] = ( datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc), table, )