Skip to content

Missing pushdowns for pyarrow datasets #703

@adriangb

Description

@adriangb

Some predicates are not pushed down to pyarrow datasets correctly. I'm guessing it's just not implemented but I couldn't find any issue tracking what is and what isn't implemented.

This example is a bit contrived but the point is that in both the integer and timestamp case datafusion should push down the filter and thus match no fragments, read no files and return an empty result set. The missing file trick is just a way to verify that no files are being read.

from datetime import datetime, timezone
import shutil

import datafusion
import pyarrow
import pyarrow.compute
import pyarrow.dataset
import pyarrow.fs


shutil.rmtree('data', ignore_errors=True)  # proof that there's no data in the data directory

format = pyarrow.dataset.ParquetFileFormat()
filesystem = pyarrow.fs.SubTreeFileSystem('data', pyarrow.fs.LocalFileSystem())
fragments = [
    # note that the partition_expression is totally wrong
    format.make_fragment(
        '1.parquet',
        filesystem=filesystem,
        partition_expression=(pyarrow.dataset.field('a') <= pyarrow.compute.scalar(1)),
    )
]

dataset = pyarrow.dataset.FileSystemDataset(
    fragments, pyarrow.schema([pyarrow.field('a', pyarrow.int64())]), format, filesystem
)

fragments = list(dataset.get_fragments(pyarrow.dataset.field('a') == pyarrow.scalar(2)))
assert fragments == []

ctx = datafusion.SessionContext()
ctx.register_dataset('dataset', dataset)

df = ctx.sql('SELECT * FROM dataset WHERE a = 2')
assert df.collect() == [], fragments


format = pyarrow.dataset.ParquetFileFormat()
filesystem = pyarrow.fs.SubTreeFileSystem('data', pyarrow.fs.LocalFileSystem())
fragments = [
    # note that the partition_expression is totally wrong
    format.make_fragment(
        '1.parquet',
        filesystem=filesystem,
        partition_expression=(
            pyarrow.dataset.field('a') == pyarrow.scalar(datetime(2000, 1, 1, tzinfo=timezone.utc), pyarrow.timestamp('ns', '+00:00'))
        ),
    )
]

dataset = pyarrow.dataset.FileSystemDataset(
    fragments, pyarrow.schema([pyarrow.field('a', pyarrow.timestamp('ns', '+00:00'))]), format, filesystem
)

fragments = list(
    dataset.get_fragments(
        pyarrow.dataset.field('a')
        == pyarrow.scalar(datetime(2024, 1, 1, tzinfo=timezone.utc), pyarrow.timestamp('ns', '+00:00'))
    )
)
assert fragments == [], fragments

ctx = datafusion.SessionContext()
ctx.register_dataset('dataset', dataset)

df = ctx.sql("SELECT * FROM dataset WHERE a = '2024-01-01T00:00:00+00:00'")
# error because it tries to access the file that doesn’t exist
assert df.collect() == []

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions