diff --git a/paimon-python/pypaimon/read/datasource/ray_datasource.py b/paimon-python/pypaimon/read/datasource/ray_datasource.py index b08d4c1e50df..c8a3fa8fc929 100644 --- a/paimon-python/pypaimon/read/datasource/ray_datasource.py +++ b/paimon-python/pypaimon/read/datasource/ray_datasource.py @@ -157,7 +157,10 @@ def _get_read_task( if batch.num_rows == 0: continue has_data = True - yield pyarrow.Table.from_batches([batch], schema=schema) + table = pyarrow.Table.from_batches([batch]) + if table.schema != schema: + table = table.cast(schema) + yield table if not has_data: yield pyarrow.Table.from_arrays( diff --git a/paimon-python/pypaimon/tests/ray_integration_test.py b/paimon-python/pypaimon/tests/ray_integration_test.py index 4d9613c4994d..f0664168ee36 100644 --- a/paimon-python/pypaimon/tests/ray_integration_test.py +++ b/paimon-python/pypaimon/tests/ray_integration_test.py @@ -345,6 +345,32 @@ def test_read_paimon_invalid_override_num_blocks(self): read_paimon('default.does_not_matter', self.catalog_options, override_num_blocks=0) + def test_read_paimon_pk_single_snapshot(self): + """read_paimon on a PK table with a single snapshot (raw-convertible + splits) must not raise ArrowInvalid on schema nullability mismatch. + + The Paimon table schema marks PK columns as NOT NULL, but the + Parquet reader may produce nullable fields. The RayDatasource + read task must cast the batch to align the schema rather than + rejecting it via strict from_batches equality. + """ + from pypaimon.ray import read_paimon + + pa_schema = pa.schema([ + pa.field('id', pa.int32(), nullable=False), + ('name', pa.string()), + ]) + identifier = self._create_and_populate_table( + 'test_read_pk_single_snap', pa_schema, + {'id': [1, 2, 3], 'name': ['a', 'b', 'c']}, + primary_keys=['id'], options={'bucket': '2'}, + ) + + ds = read_paimon(identifier, self.catalog_options) + self.assertEqual(ds.count(), 3) + df = ds.to_pandas().sort_values('id').reset_index(drop=True) + self.assertEqual(list(df['id']), [1, 2, 3]) + if __name__ == '__main__': unittest.main()