From a4d4f5aeb3ae19a704565b3029d0dc57872c33fd Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Thu, 30 Oct 2025 17:52:22 +0000 Subject: [PATCH 1/3] fix: managed arrow table iterates None list and struct incorrectly --- bigframes/core/local_data.py | 25 +++++++++--- tests/system/small/engines/test_read_local.py | 5 ++- .../out.sql | 19 --------- tests/unit/test_local_data.py | 39 +++++++++++++++---- 4 files changed, 53 insertions(+), 35 deletions(-) delete mode 100644 tests/unit/core/compile/sqlglot/snapshots/test_compile_readlocal/test_compile_readlocal_w_nested_structs_df/out.sql diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index fa18f00483..43ac105e6f 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -253,9 +253,16 @@ def _( value_generator = iter_array( array.flatten(), bigframes.dtypes.get_array_inner_type(dtype) ) - for (start, end) in _pairwise(array.offsets): - arr_size = end.as_py() - start.as_py() - yield list(itertools.islice(value_generator, arr_size)) + offset_generator = iter_array(array.offsets, bigframes.dtypes.INT_DTYPE) + is_null_generator = iter_array(array.is_null(), bigframes.dtypes.BOOL_DTYPE) + previous_offset = next(offset_generator) + for is_null, offset in zip(is_null_generator, offset_generator): + arr_size = offset - previous_offset + previous_offset = offset + if is_null: + yield None + else: + yield list(itertools.islice(value_generator, arr_size)) @iter_array.register def _( @@ -267,8 +274,14 @@ def _( sub_generators[field_name] = iter_array(array.field(field_name), dtype) keys = list(sub_generators.keys()) - for row_values in zip(*sub_generators.values()): - yield {key: value for key, value in zip(keys, row_values)} + row_values_iter = zip(*sub_generators.values()) + is_null_iter = array.is_null() + + for is_row_null, row_values in zip(is_null_iter, row_values_iter): + if not is_row_null: + yield {key: value for key, value in zip(keys, row_values)} + else: + yield None for batch in table.to_batches(): sub_generators: dict[str, Generator[Any, None, None]] = {} @@ -354,7 +367,7 @@ def _adapt_arrow_array(array: pa.Array) -> tuple[pa.Array, bigframes.dtypes.Dtyp new_value = pa.ListArray.from_arrays( array.offsets, values, mask=array.is_null() ) - return new_value.fill_null([]), bigframes.dtypes.list_type(values_type) + return new_value, bigframes.dtypes.list_type(values_type) if array.type == bigframes.dtypes.JSON_ARROW_TYPE: return _canonicalize_json(array), bigframes.dtypes.JSON_DTYPE target_type = logical_type_replacements(array.type) diff --git a/tests/system/small/engines/test_read_local.py b/tests/system/small/engines/test_read_local.py index abdd29c4ac..257bddd917 100644 --- a/tests/system/small/engines/test_read_local.py +++ b/tests/system/small/engines/test_read_local.py @@ -88,8 +88,9 @@ def test_engines_read_local_w_zero_row_source( assert_equivalence_execution(local_node, REFERENCE_ENGINE, engine) -# TODO: Fix sqlglot impl -@pytest.mark.parametrize("engine", ["polars", "bq", "pyarrow"], indirect=True) +@pytest.mark.parametrize( + "engine", ["polars", "bq", "pyarrow", "bq-sqlglot"], indirect=True +) def test_engines_read_local_w_nested_source( fake_session: bigframes.Session, nested_data_source: local_data.ManagedArrowTable, diff --git a/tests/unit/core/compile/sqlglot/snapshots/test_compile_readlocal/test_compile_readlocal_w_nested_structs_df/out.sql b/tests/unit/core/compile/sqlglot/snapshots/test_compile_readlocal/test_compile_readlocal_w_nested_structs_df/out.sql deleted file mode 100644 index 42b7bc7361..0000000000 --- a/tests/unit/core/compile/sqlglot/snapshots/test_compile_readlocal/test_compile_readlocal_w_nested_structs_df/out.sql +++ /dev/null @@ -1,19 +0,0 @@ -SELECT - * -FROM UNNEST(ARRAY>, `bfcol_2` INT64>>[( - 1, - STRUCT( - 'Alice' AS `name`, - 30 AS `age`, - STRUCT('New York' AS `city`, 'USA' AS `country`) AS `address` - ), - 0 -), ( - 2, - STRUCT( - 'Bob' AS `name`, - 25 AS `age`, - STRUCT('London' AS `city`, 'UK' AS `country`) AS `address` - ), - 1 -)]) \ No newline at end of file diff --git a/tests/unit/test_local_data.py b/tests/unit/test_local_data.py index dfd1cd622f..8c265531be 100644 --- a/tests/unit/test_local_data.py +++ b/tests/unit/test_local_data.py @@ -20,20 +20,21 @@ pd_data = pd.DataFrame( { - "ints": [10, 20, 30, 40], - "nested_ints": [[1, 2], [3, 4, 5], [], [20, 30]], - "structs": [{"a": 100}, {}, {"b": 200}, {"b": 300}], + "ints": [10, 20, 30, 40, 50], + "nested_ints": [[1, 2], None, [3, 4, 5], [], [20, 30]], + "structs": [{"a": 100}, None, {}, {"b": 200}, {"b": 300}], } ) pd_data_normalized = pd.DataFrame( { - "ints": pd.Series([10, 20, 30, 40], dtype=dtypes.INT_DTYPE), + "ints": pd.Series([10, 20, 30, 40, 50], dtype=dtypes.INT_DTYPE), "nested_ints": pd.Series( - [[1, 2], [3, 4, 5], [], [20, 30]], dtype=pd.ArrowDtype(pa.list_(pa.int64())) + [[1, 2], None, [3, 4, 5], [], [20, 30]], + dtype=pd.ArrowDtype(pa.list_(pa.int64())), ), "structs": pd.Series( - [{"a": 100}, {}, {"b": 200}, {"b": 300}], + [{"a": 100}, None, {}, {"b": 200}, {"b": 300}], dtype=pd.ArrowDtype(pa.struct({"a": pa.int64(), "b": pa.int64()})), ), } @@ -122,11 +123,11 @@ def test_local_data_well_formed_round_trip_chunked(): def test_local_data_well_formed_round_trip_sliced(): pa_table = pa.Table.from_pandas(pd_data, preserve_index=False) - as_rechunked_pyarrow = pa.Table.from_batches(pa_table.slice(2, 4).to_batches()) + as_rechunked_pyarrow = pa.Table.from_batches(pa_table.slice(0, 4).to_batches()) local_entry = local_data.ManagedArrowTable.from_pyarrow(as_rechunked_pyarrow) result = pd.DataFrame(local_entry.itertuples(), columns=pd_data.columns) pandas.testing.assert_frame_equal( - pd_data_normalized[2:4].reset_index(drop=True), + pd_data_normalized[0:4].reset_index(drop=True), result.reset_index(drop=True), check_dtype=False, ) @@ -143,3 +144,25 @@ def test_local_data_not_equal_other(): local_entry2 = local_data.ManagedArrowTable.from_pandas(pd_data[::2]) assert local_entry != local_entry2 assert hash(local_entry) != hash(local_entry2) + + +def test_local_data_itertuples_struct_none(): + pd_data = pd.DataFrame( + { + "structs": [{"a": 100}, None, {"b": 200}, {"b": 300}], + } + ) + local_entry = local_data.ManagedArrowTable.from_pandas(pd_data) + result = list(local_entry.itertuples()) + assert result[1][0] is None + + +def test_local_data_itertuples_list_none(): + pd_data = pd.DataFrame( + { + "lists": [[1, 2], None, [3, 4]], + } + ) + local_entry = local_data.ManagedArrowTable.from_pandas(pd_data) + result = list(local_entry.itertuples()) + assert result[1][0] is None From 47c4281bc336ecaf3b59d7524ce1d5ed720b79df Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Mon, 3 Nov 2025 22:22:02 +0000 Subject: [PATCH 2/3] fix iter_array for struct failed in python 3.9 --- bigframes/core/local_data.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index 43ac105e6f..a6c809f706 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -274,10 +274,11 @@ def _( sub_generators[field_name] = iter_array(array.field(field_name), dtype) keys = list(sub_generators.keys()) - row_values_iter = zip(*sub_generators.values()) - is_null_iter = array.is_null() + is_null_generator = iter_array(array.is_null(), bigframes.dtypes.BOOL_DTYPE) - for is_row_null, row_values in zip(is_null_iter, row_values_iter): + for values in zip(is_null_generator, *sub_generators.values()): + is_row_null = values[0] + row_values = values[1:] if not is_row_null: yield {key: value for key, value in zip(keys, row_values)} else: From d787ceccb0b7ae23eb23bc859bdfa3661a74f247 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Thu, 6 Nov 2025 18:22:07 +0000 Subject: [PATCH 3/3] undo null list because bq array does not allow null item when materialization --- bigframes/core/local_data.py | 31 +++++++++---------------------- tests/unit/test_local_data.py | 6 +++--- 2 files changed, 12 insertions(+), 25 deletions(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index a6c809f706..0735c4fc5a 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -254,14 +254,14 @@ def _( array.flatten(), bigframes.dtypes.get_array_inner_type(dtype) ) offset_generator = iter_array(array.offsets, bigframes.dtypes.INT_DTYPE) - is_null_generator = iter_array(array.is_null(), bigframes.dtypes.BOOL_DTYPE) - previous_offset = next(offset_generator) - for is_null, offset in zip(is_null_generator, offset_generator): - arr_size = offset - previous_offset - previous_offset = offset - if is_null: - yield None - else: + + start_offset = None + end_offset = None + for offset in offset_generator: + start_offset = end_offset + end_offset = offset + if start_offset is not None: + arr_size = end_offset - start_offset yield list(itertools.islice(value_generator, arr_size)) @iter_array.register @@ -368,7 +368,7 @@ def _adapt_arrow_array(array: pa.Array) -> tuple[pa.Array, bigframes.dtypes.Dtyp new_value = pa.ListArray.from_arrays( array.offsets, values, mask=array.is_null() ) - return new_value, bigframes.dtypes.list_type(values_type) + return new_value.fill_null([]), bigframes.dtypes.list_type(values_type) if array.type == bigframes.dtypes.JSON_ARROW_TYPE: return _canonicalize_json(array), bigframes.dtypes.JSON_DTYPE target_type = logical_type_replacements(array.type) @@ -505,16 +505,3 @@ def _schema_durations_to_ints(schema: pa.Schema) -> pa.Schema: return pa.schema( pa.field(field.name, _durations_to_ints(field.type)) for field in schema ) - - -def _pairwise(iterable): - do_yield = False - a = None - b = None - for item in iterable: - a = b - b = item - if do_yield: - yield (a, b) - else: - do_yield = True diff --git a/tests/unit/test_local_data.py b/tests/unit/test_local_data.py index 8c265531be..6f23036efb 100644 --- a/tests/unit/test_local_data.py +++ b/tests/unit/test_local_data.py @@ -21,7 +21,7 @@ pd_data = pd.DataFrame( { "ints": [10, 20, 30, 40, 50], - "nested_ints": [[1, 2], None, [3, 4, 5], [], [20, 30]], + "nested_ints": [[1, 2], [], [3, 4, 5], [], [20, 30]], "structs": [{"a": 100}, None, {}, {"b": 200}, {"b": 300}], } ) @@ -30,7 +30,7 @@ { "ints": pd.Series([10, 20, 30, 40, 50], dtype=dtypes.INT_DTYPE), "nested_ints": pd.Series( - [[1, 2], None, [3, 4, 5], [], [20, 30]], + [[1, 2], [], [3, 4, 5], [], [20, 30]], dtype=pd.ArrowDtype(pa.list_(pa.int64())), ), "structs": pd.Series( @@ -165,4 +165,4 @@ def test_local_data_itertuples_list_none(): ) local_entry = local_data.ManagedArrowTable.from_pandas(pd_data) result = list(local_entry.itertuples()) - assert result[1][0] is None + assert result[1][0] == []