Skip to content

Commit 0384b4e

Browse files
committed
Less is more 😍
1 parent fa9b3ca commit 0384b4e

File tree

3 files changed

+6
-28
lines changed

3 files changed

+6
-28
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@
9999
HDFS_KERB_TICKET,
100100
HDFS_PORT,
101101
HDFS_USER,
102-
PYARROW_USE_LARGE_TYPES_ON_READ,
103102
S3_ACCESS_KEY_ID,
104103
S3_CONNECT_TIMEOUT,
105104
S3_ENDPOINT,
@@ -1348,7 +1347,6 @@ def _task_to_record_batches(
13481347
positional_deletes: Optional[List[ChunkedArray]],
13491348
case_sensitive: bool,
13501349
name_mapping: Optional[NameMapping] = None,
1351-
use_large_types: Optional[bool] = True,
13521350
partition_spec: Optional[PartitionSpec] = None,
13531351
) -> Iterator[pa.RecordBatch]:
13541352
_, _, path = _parse_location(task.file.file_path)
@@ -1376,21 +1374,13 @@ def _task_to_record_batches(
13761374

13771375
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
13781376

1379-
fragment_schema = physical_schema
1380-
if use_large_types is not None:
1381-
fragment_schema = (
1382-
_pyarrow_schema_ensure_large_types(physical_schema)
1383-
if use_large_types
1384-
else (_pyarrow_schema_ensure_small_types(physical_schema))
1385-
)
1386-
13871377
fragment_scanner = ds.Scanner.from_fragment(
13881378
fragment=fragment,
13891379
# With PyArrow 16.0.0 there is an issue with casting record-batches:
13901380
# https://github.com/apache/arrow/issues/41884
13911381
# https://github.com/apache/arrow/issues/43183
13921382
# Would be good to remove this later on
1393-
schema=fragment_schema,
1383+
schema=physical_schema,
13941384
# This will push down the query to Arrow.
13951385
# But in case there are positional deletes, we have to apply them first
13961386
filter=pyarrow_filter if not positional_deletes else None,
@@ -1425,7 +1415,6 @@ def _task_to_record_batches(
14251415
file_project_schema,
14261416
current_batch,
14271417
downcast_ns_timestamp_to_us=True,
1428-
use_large_types=use_large_types,
14291418
)
14301419

14311420
# Inject projected column values if available
@@ -1539,12 +1528,8 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
15391528
deletes_per_file = _read_all_delete_files(self._io, tasks)
15401529
executor = ExecutorFactory.get_or_create()
15411530

1542-
use_large_types = None
1543-
if PYARROW_USE_LARGE_TYPES_ON_READ in self._io.properties:
1544-
use_large_types = property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
1545-
15461531
def _table_from_scan_task(task: FileScanTask) -> pa.Table:
1547-
batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, use_large_types))
1532+
batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
15481533
if len(batches) > 0:
15491534
return pa.Table.from_batches(batches)
15501535
else:
@@ -1606,13 +1591,12 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
16061591
deletes_per_file = _read_all_delete_files(self._io, tasks)
16071592
# Always use large types, since we cannot infer it in a streaming fashion,
16081593
# without fetching all the schemas first, which defeats the purpose of streaming
1609-
return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file, use_large_types=True)
1594+
return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file)
16101595

16111596
def _record_batches_from_scan_tasks_and_deletes(
16121597
self,
16131598
tasks: Iterable[FileScanTask],
16141599
deletes_per_file: Dict[str, List[ChunkedArray]],
1615-
use_large_types: Optional[bool] = True,
16161600
) -> Iterator[pa.RecordBatch]:
16171601
total_row_count = 0
16181602
for task in tasks:
@@ -1627,7 +1611,6 @@ def _record_batches_from_scan_tasks_and_deletes(
16271611
deletes_per_file.get(task.file.file_path),
16281612
self._case_sensitive,
16291613
self._table_metadata.name_mapping(),
1630-
use_large_types,
16311614
self._table_metadata.spec(),
16321615
)
16331616
for batch in batches:
@@ -1646,13 +1629,12 @@ def _to_requested_schema(
16461629
batch: pa.RecordBatch,
16471630
downcast_ns_timestamp_to_us: bool = False,
16481631
include_field_ids: bool = False,
1649-
use_large_types: Optional[bool] = True,
16501632
) -> pa.RecordBatch:
16511633
# We could reuse some of these visitors
16521634
struct_array = visit_with_partner(
16531635
requested_schema,
16541636
batch,
1655-
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids, use_large_types),
1637+
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids),
16561638
ArrowAccessor(file_schema),
16571639
)
16581640
return pa.RecordBatch.from_struct_array(struct_array)
@@ -1669,12 +1651,10 @@ def __init__(
16691651
file_schema: Schema,
16701652
downcast_ns_timestamp_to_us: bool = False,
16711653
include_field_ids: bool = False,
1672-
use_large_types: Optional[bool] = True,
16731654
) -> None:
16741655
self._file_schema = file_schema
16751656
self._include_field_ids = include_field_ids
16761657
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
1677-
self._use_large_types = use_large_types
16781658

16791659
def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
16801660
file_field = self._file_schema.find_field(field.field_id)
@@ -1684,8 +1664,6 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
16841664
target_schema = schema_to_pyarrow(
16851665
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
16861666
)
1687-
if self._use_large_types is False:
1688-
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
16891667
return values.cast(target_schema)
16901668
elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
16911669
if field.field_type == TimestampType():

pyiceberg/table/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1750,7 +1750,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
17501750
return pa.RecordBatchReader.from_batches(
17511751
target_schema,
17521752
batches,
1753-
)
1753+
).cast(target_schema)
17541754

17551755
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
17561756
"""Read a Pandas DataFrame eagerly from this Iceberg table.

tests/integration/test_reads.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,7 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
881881
expected_schema = pa.schema(
882882
[
883883
pa.field("string", pa.string()),
884-
pa.field("string-to-binary", pa.binary()),
884+
pa.field("string-to-binary", pa.large_binary()),
885885
pa.field("binary", pa.binary()),
886886
pa.field("list", pa.list_(pa.string())),
887887
]

0 commit comments

Comments
 (0)