Skip to content

Commit d4d8855

Browse files
authored
feat: support pandas inputs in more bigframes.bigquery functions (#17224)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/google-cloud-python/issues) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕
1 parent 529479e commit d4d8855

4 files changed

Lines changed: 306 additions & 2 deletions

File tree

packages/bigframes/bigframes/core/googlesql.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,41 @@
1616

1717
from __future__ import annotations
1818

19-
from typing import Any, Union
19+
from typing import TYPE_CHECKING, Any, Optional, Union
20+
21+
import pandas as pd
2022

2123
import bigframes.core.col
2224
import bigframes.core.expression as ex
25+
import bigframes.core.global_session as global_session
2326
import bigframes.core.sentinels as sentinels
2427
import bigframes.series as series
2528
from bigframes.operations import googlesql
2629

30+
if TYPE_CHECKING:
31+
import bigframes.session
32+
33+
34+
def _is_pandas_series(arg: Any) -> bool:
35+
return isinstance(arg, pd.Series)
36+
37+
38+
def _find_session(*args: Any) -> Optional[bigframes.session.Session]:
39+
import bigframes.core.indexes as indexes
40+
import bigframes.dataframe as dataframe
41+
42+
for arg in args:
43+
if isinstance(arg, (series.Series, dataframe.DataFrame, indexes.Index)):
44+
return arg._session
45+
return None
46+
47+
48+
def _get_session(*args: Any) -> bigframes.session.Session:
49+
session = _find_session(*args)
50+
if session is not None:
51+
return session
52+
return global_session.get_global_session()
53+
2754

2855
def apply_googlesql_scalar_op(
2956
op: googlesql.GoogleSqlScalarOp,
@@ -44,6 +71,14 @@ def apply_googlesql_scalar_op(
4471
The result of the operation. If any of ``args`` is a Series, returns
4572
a Series. Otherwise, returns an Expression.
4673
"""
74+
has_pandas_series = any(_is_pandas_series(arg) for arg in args)
75+
76+
if has_pandas_series:
77+
session = _get_session(*args)
78+
args = tuple(
79+
session.read_pandas(arg) if _is_pandas_series(arg) else arg for arg in args
80+
)
81+
4782
# Find the first Series to use for alignment
4883
first_series = None
4984
for arg in args:

packages/bigframes/docs/reference/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ packages.
99

1010
bigframes._config
1111
bigframes.bigquery
12+
bigframes.bigquery.aead
1213
bigframes.bigquery.ai
1314
bigframes.bigquery.ml
1415
bigframes.bigquery.obj

packages/bigframes/tests/system/small/bigquery/test_array.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
],
6969
)
7070
def test_array_length(input_data, expected):
71-
series = bpd.Series(input_data)
71+
series = pd.Series(input_data)
7272
expected = pd.Series(
7373
expected,
7474
index=pd.Index(range(len(input_data)), dtype="Int64"),
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import unittest.mock as mock
16+
17+
import pandas as pd
18+
19+
import bigframes.core.col as col
20+
import bigframes.core.expression as ex
21+
import bigframes.core.global_session
22+
import bigframes.core.googlesql as core_googlesql
23+
import bigframes.series as series
24+
from bigframes.operations import googlesql
25+
from bigframes.testing import mocks
26+
27+
# Define a test op
28+
_TEST_OP = googlesql.GoogleSqlScalarOp(
29+
"TEST_OP",
30+
args=(googlesql.ArgSpec(), googlesql.ArgSpec()),
31+
signature=lambda *args: None,
32+
)
33+
34+
35+
def test_apply_googlesql_scalar_op_expressions():
36+
# Only expressions
37+
result = core_googlesql.apply_googlesql_scalar_op(
38+
_TEST_OP,
39+
col.col("a"),
40+
col.col("b"),
41+
)
42+
assert isinstance(result, col.Expression)
43+
44+
45+
def test_apply_googlesql_scalar_op_pandas_series_global_session(monkeypatch):
46+
# Setup mock session
47+
session = mocks.create_bigquery_session()
48+
monkeypatch.setattr(bigframes.core.global_session, "_global_session", session)
49+
bigframes.options.bigquery._session_started = True
50+
51+
# Create a real-ish Series to return from read_pandas
52+
df = mocks.create_dataframe(monkeypatch, session=session, data={"col": [1, 2, 3]})
53+
bf_series = df["col"]
54+
55+
# Mock read_pandas on the session
56+
mock_read_pandas = mock.MagicMock(return_value=bf_series)
57+
session.read_pandas = mock_read_pandas # type: ignore
58+
59+
# Mock _apply_nary_op on Series class to avoid real compilation/execution
60+
mock_apply_nary_op = mock.MagicMock(return_value=bf_series)
61+
monkeypatch.setattr(series.Series, "_apply_nary_op", mock_apply_nary_op)
62+
63+
pd_series = pd.Series([1, 2, 3])
64+
65+
# Call the function with a pandas Series and a literal
66+
result = core_googlesql.apply_googlesql_scalar_op(_TEST_OP, pd_series, 42)
67+
68+
# Verify read_pandas was called on the global session
69+
mock_read_pandas.assert_called_once_with(pd_series)
70+
71+
# Verify _apply_nary_op was called on the converted series
72+
mock_apply_nary_op.assert_called_once()
73+
# First arg to _apply_nary_op is the op, second is the processed_args
74+
assert mock_apply_nary_op.call_args[0][0] == _TEST_OP
75+
# processed_args should contain the converted bf_series and the literal 42
76+
processed_args = mock_apply_nary_op.call_args[0][1]
77+
assert processed_args[0] is bf_series
78+
assert processed_args[1] == 42
79+
80+
# Verify result is a Series
81+
assert isinstance(result, series.Series)
82+
83+
84+
def test_apply_googlesql_scalar_op_pandas_series_with_bf_series(monkeypatch):
85+
# Setup mock session 1 (global) and session 2 (associated with bf_series)
86+
global_session = mocks.create_bigquery_session(session_id="global")
87+
monkeypatch.setattr(
88+
bigframes.core.global_session, "_global_session", global_session
89+
)
90+
bigframes.options.bigquery._session_started = True
91+
92+
bf_session = mocks.create_bigquery_session(session_id="bf_session")
93+
94+
# Create a bf_series associated with bf_session
95+
df = mocks.create_dataframe(
96+
monkeypatch, session=bf_session, data={"col": [1, 2, 3]}
97+
)
98+
bf_series = df["col"]
99+
100+
assert bf_series._session == bf_session
101+
102+
# Mock read_pandas on both sessions
103+
mock_global_read_pandas = mock.MagicMock()
104+
global_session.read_pandas = mock_global_read_pandas # type: ignore
105+
106+
mock_bf_read_pandas = mock.MagicMock(return_value=bf_series)
107+
bf_session.read_pandas = mock_bf_read_pandas # type: ignore
108+
109+
# Mock _apply_nary_op
110+
mock_apply_nary_op = mock.MagicMock(return_value=bf_series)
111+
monkeypatch.setattr(series.Series, "_apply_nary_op", mock_apply_nary_op)
112+
113+
pd_series = pd.Series([1, 2, 3])
114+
115+
# Call with both pandas Series and BigFrames Series
116+
result = core_googlesql.apply_googlesql_scalar_op(_TEST_OP, pd_series, bf_series)
117+
118+
# Verify read_pandas was called on bf_session, NOT global_session
119+
mock_bf_read_pandas.assert_called_once_with(pd_series)
120+
mock_global_read_pandas.assert_not_called()
121+
122+
# Verify _apply_nary_op was called
123+
mock_apply_nary_op.assert_called_once()
124+
processed_args = mock_apply_nary_op.call_args[0][1]
125+
# Both arguments to the op should now be BigFrames Series
126+
assert processed_args[0] is bf_series
127+
assert processed_args[1] is bf_series
128+
129+
assert isinstance(result, series.Series)
130+
131+
132+
def test_apply_googlesql_scalar_op_mixed_args(monkeypatch):
133+
session = mocks.create_bigquery_session()
134+
monkeypatch.setattr(bigframes.core.global_session, "_global_session", session)
135+
bigframes.options.bigquery._session_started = True
136+
137+
df = mocks.create_dataframe(monkeypatch, session=session, data={"col": [1, 2, 3]})
138+
bf_series = df["col"]
139+
140+
mock_read_pandas = mock.MagicMock(return_value=bf_series)
141+
session.read_pandas = mock_read_pandas # type: ignore
142+
143+
mock_apply_nary_op = mock.MagicMock(return_value=bf_series)
144+
monkeypatch.setattr(series.Series, "_apply_nary_op", mock_apply_nary_op)
145+
146+
pd_series = pd.Series([1, 2, 3])
147+
expr = col.Expression(ex.const(10))
148+
149+
# Call with pandas Series, Expression, and Literal
150+
result = core_googlesql.apply_googlesql_scalar_op(_TEST_OP, pd_series, expr, 42)
151+
152+
# Verify pandas Series was converted
153+
mock_read_pandas.assert_called_once_with(pd_series)
154+
155+
# Verify _apply_nary_op was called
156+
mock_apply_nary_op.assert_called_once()
157+
processed_args = mock_apply_nary_op.call_args[0][1]
158+
159+
# Processed args should be:
160+
# 1. bf_series (converted from pd_series)
161+
# 2. A new Series (projected from the expression onto bf_series' block)
162+
# 3. Literal 42
163+
assert isinstance(processed_args[0], series.Series)
164+
assert processed_args[0] is bf_series
165+
166+
assert isinstance(processed_args[1], series.Series)
167+
assert processed_args[1] is not bf_series
168+
169+
assert processed_args[2] == 42
170+
171+
assert isinstance(result, series.Series)
172+
173+
174+
def test_apply_googlesql_scalar_op_pandas_series_with_bf_dataframe(monkeypatch):
175+
# Setup mock session 2 (associated with bf_dataframe)
176+
bf_session = mocks.create_bigquery_session(session_id="bf_session")
177+
178+
# Create a bf_dataframe associated with bf_session
179+
bf_dataframe = mocks.create_dataframe(
180+
monkeypatch, session=bf_session, data={"col": [1, 2, 3]}
181+
)
182+
bf_series = bf_dataframe["col"]
183+
184+
# Setup mock session 1 (global) AFTER creating the dataframe
185+
global_session = mocks.create_bigquery_session(session_id="global")
186+
monkeypatch.setattr(
187+
bigframes.core.global_session, "_global_session", global_session
188+
)
189+
bigframes.options.bigquery._session_started = True
190+
191+
assert bf_dataframe._session == bf_session
192+
193+
# Mock read_pandas on both sessions
194+
mock_global_read_pandas = mock.MagicMock()
195+
global_session.read_pandas = mock_global_read_pandas # type: ignore
196+
197+
mock_bf_read_pandas = mock.MagicMock(return_value=bf_series)
198+
bf_session.read_pandas = mock_bf_read_pandas # type: ignore
199+
200+
# Mock _apply_nary_op
201+
mock_apply_nary_op = mock.MagicMock(return_value=bf_series)
202+
monkeypatch.setattr(series.Series, "_apply_nary_op", mock_apply_nary_op)
203+
204+
pd_series = pd.Series([1, 2, 3])
205+
206+
# Call with pandas Series and BigFrames DataFrame
207+
result = core_googlesql.apply_googlesql_scalar_op(_TEST_OP, pd_series, bf_dataframe)
208+
209+
# Verify read_pandas was called on bf_session, NOT global_session
210+
mock_bf_read_pandas.assert_called_once_with(pd_series)
211+
mock_global_read_pandas.assert_not_called()
212+
213+
# Verify _apply_nary_op was called
214+
mock_apply_nary_op.assert_called_once()
215+
processed_args = mock_apply_nary_op.call_args[0][1]
216+
assert processed_args[0] is bf_series
217+
assert processed_args[1] is bf_dataframe
218+
219+
assert isinstance(result, series.Series)
220+
221+
222+
def test_apply_googlesql_scalar_op_pandas_series_with_bf_index(monkeypatch):
223+
# Setup mock session 2 (associated with bf_index)
224+
bf_session = mocks.create_bigquery_session(session_id="bf_session")
225+
226+
# Create a bf_dataframe associated with bf_session to get an index
227+
bf_dataframe = mocks.create_dataframe(
228+
monkeypatch, session=bf_session, data={"col": [1, 2, 3]}
229+
)
230+
bf_index = bf_dataframe.index
231+
bf_series = bf_dataframe["col"]
232+
233+
# Setup mock session 1 (global) AFTER creating the dataframe
234+
global_session = mocks.create_bigquery_session(session_id="global")
235+
monkeypatch.setattr(
236+
bigframes.core.global_session, "_global_session", global_session
237+
)
238+
bigframes.options.bigquery._session_started = True
239+
240+
assert bf_index._session == bf_session
241+
242+
# Mock read_pandas on both sessions
243+
mock_global_read_pandas = mock.MagicMock()
244+
global_session.read_pandas = mock_global_read_pandas # type: ignore
245+
246+
mock_bf_read_pandas = mock.MagicMock(return_value=bf_series)
247+
bf_session.read_pandas = mock_bf_read_pandas # type: ignore
248+
249+
# Mock _apply_nary_op
250+
mock_apply_nary_op = mock.MagicMock(return_value=bf_series)
251+
monkeypatch.setattr(series.Series, "_apply_nary_op", mock_apply_nary_op)
252+
253+
pd_series = pd.Series([1, 2, 3])
254+
255+
# Call with pandas Series and BigFrames Index
256+
result = core_googlesql.apply_googlesql_scalar_op(_TEST_OP, pd_series, bf_index)
257+
258+
# Verify read_pandas was called on bf_session, NOT global_session
259+
mock_bf_read_pandas.assert_called_once_with(pd_series)
260+
mock_global_read_pandas.assert_not_called()
261+
262+
# Verify _apply_nary_op was called
263+
mock_apply_nary_op.assert_called_once()
264+
processed_args = mock_apply_nary_op.call_args[0][1]
265+
assert processed_args[0] is bf_series
266+
assert processed_args[1] is bf_index
267+
268+
assert isinstance(result, series.Series)

0 commit comments

Comments
 (0)