Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SELECT on Hive Partitioned PyArrow Dataset fails when File System is ADLFS #5305

Closed
russellpierce opened this issue Nov 11, 2022 · 6 comments
Labels

Comments

@russellpierce
Copy link

russellpierce commented Nov 11, 2022

What's happening: The duckDb select of hive partitioned Arrow dataset hangs when it is read from an adlfs.spec.AzureBlobFileSystem.

What I expect: For the duckDb select of a hive partitioned Arrow dataset to behave essentially the same regardless of whether an adlfs.spec.AzureBlobFileSystem or fsspec.implementations.local.LocalFileSystem is used.

What I'm seeing is that con.execute("select * from example_ds").arrow() hangs silently when example_ds is HivePartitioned and the backing file system is adlfs for Azure Data Lake Gen 2 (specifically yielding adlfs.spec.AzureBlobFileSystem). Notably example_ds.to_table() using Arrow alone passes without complaint.

I tested this in the docker image python:3.10-buster.

I did a fresh install using pip install duckdb pyarrow adlfs yielding a pip freeze of:

adal==1.2.7
adlfs==2022.10.0
aiohttp==3.8.3
aiosignal==1.3.1
async-timeout==4.0.2
attrs==22.1.0
azure-core==1.26.1
azure-datalake-store==0.0.52
azure-identity==1.12.0
azure-storage-blob==12.14.1
certifi==2022.9.24
cffi==1.15.1
charset-normalizer==2.1.1
cryptography==38.0.3
duckdb==0.5.1
frozenlist==1.3.3
fsspec==2022.11.0
idna==3.4
isodate==0.6.1
msal==1.20.0
msal-extensions==1.0.0
msrest==0.7.1
multidict==6.0.2
numpy==1.23.4
oauthlib==3.2.2
portalocker==2.6.0
pyarrow==10.0.0
pycparser==2.21
PyJWT==2.6.0
python-dateutil==2.8.2
requests==2.28.1
requests-oauthlib==1.3.1
six==1.16.0
typing_extensions==4.4.0
urllib3==1.26.12
yarl==1.8.1

I have a script that tests with the local file system and the remote filesystem, with and without partitioning. It only hangs on the case with the remote and partitioning attempted. To make this example work, you'll need to set an env var for azure_connection_string_secret and modify the code for the line that assigns the base directory to write to, viz. the variable base_dir. My apologies if the python code is unattractive/non-pythonic... I'm still learning.

import pyarrow as pa
from pyarrow.dataset import HivePartitioning
import duckdb
import datetime as dt
import os

from fsspec.implementations.local import LocalFileSystem
fs_local = LocalFileSystem()

import adlfs
azure_fs = adlfs.AzureBlobFileSystem(connection_string=os.getenv("azure_connection_string_secret"))

# need to write example files
base_dir = "ds-general-cool/russellrepex"
con = duckdb.connect()
con.execute("PRAGMA threads=1;")
con.execute("PRAGMA enable_progress_bar;")
con.execute("PRAGMA enable_profiling;")
con.execute("SET explain_output='all';")

print(f"{pa.__version__=}")
print(f"{duckdb.__version__=}")
print(f"{adlfs.__version__=}")

# Create fake data
example_dag_time = dt.datetime.now().replace(hour=0,minute=0, second=0, microsecond=0)
example_dag_time_not_break = dt.datetime.now().replace(hour=1,minute=30, second=0, microsecond=0)
example_dict = {
    "timestamp": [dt.datetime.now(), dt.datetime.now()-dt.timedelta(days=1), dt.datetime.now()],
    "stringfield": ["a", "b", "dsadsa"],
    "floatfield": [1, None, 3.14],
    "DAG_TIME": [example_dag_time,example_dag_time, example_dag_time_not_break]
}
df_a = pa.Table.from_pydict(example_dict)

example_dict_2 = {
    "timestamp": [dt.datetime.now()-dt.timedelta(days=2), dt.datetime.now()-dt.timedelta(days=3), dt.datetime.now()],
    "stringfield": ["a", None, "example_dict_2"],
    "floatfield": [1, None, 3.14],
    "DAG_TIME": [example_dag_time-dt.timedelta(days=1),example_dag_time-dt.timedelta(days=1), example_dag_time_not_break-dt.timedelta(days=1)]
}
df_b = pa.Table.from_pydict(example_dict_2)

# Define schema
pa_schema = pa.schema([("DAG_TIME", pa.timestamp('ms'))])
part_var = HivePartitioning(pa_schema)

for fs, name in zip([fs_local,azure_fs],["local","azure"]):
    for do_partition in (False, True):
        if do_partition:
            part_arg = part_var
        else:
            part_arg = None
        print(f"trying: {name} with partitioning as {do_partition}")

        if name == "azure":
            # I'm not going to let this run automatically, uncomment it if you want it
            # print(f"Deleting current contents of {base_dir}")
            # azure_fs.rm(base_dir, recursive=True)
            pass

        print("Writing data remotely")
        # existing_data_behavior arg not available in pyarrow==6.0.0 but is in 10.0.0

        pa.dataset.write_dataset(
            df_a,
            base_dir=base_dir,
            partitioning = part_arg,
            #partitioning=['DAG_TIME'],
            #partitioning_flavor="hive",
            format="parquet",
            existing_data_behavior="delete_matching",
            filesystem=fs
        )
        pa.dataset.write_dataset(
            df_b,
            base_dir=base_dir,
            partitioning = part_arg,
            #partitioning=['DAG_TIME'],
            #partitioning_flavor="hive",
            format="parquet",
            existing_data_behavior="delete_matching",
            filesystem=fs
        )

        print("Populating dataset")
        example_ds=pa.dataset.dataset(
            base_dir,
            filesystem=fs,
            partitioning=part_arg
        )

        # the data is there and loads
        print("demonstrating full table load")
        print(example_ds.to_table())

        # duckdb is there and responsive
        print("demonstrating duckdb connection")
        print(con.execute("select 1").arrow())

        print("Checking if we can explain the query")
        print(con.execute("explain SELECT * from example_ds").fetchall()[0][1])

        # This command hangs - hard, no escape via ctrl-c --- under some package combos
        print("Checking for hang")
        print(con.execute("select * from example_ds").arrow())

Note that earlier versions were run by hand with better cleanup between the partitioned and non-partitioned loops and the result was the same. There is a line up there you can uncomment, but I didn't feel comfortable posting code that would issue a delete command on someone else's file system.

cc @jwills

@hannes
Copy link
Member

hannes commented Nov 12, 2022

It could be due to the way DuckDB parallelises the read. Can you try with single-threaded mode (PRAGMA threads=1;)

@russellpierce
Copy link
Author

Hang still occurs with PRAMA threads=1; I added progress bar and profiling as well. During the hang nothing further populates from the progress bar or profiling. I also added an EXPLAIN on the query - it is able to explain even though it hangs on execution.

Source code updated in original post to reflect these changes.

@pdutta777
Copy link

I am facing similar problems, even on non partitioned data in ADLFS. After some experimentation, I observed that if I limit the arrow dataset to a single parquet file on ADLS, the code completes. This work up to 3 parquet files for the dataset. Any more, then it hangs.

This is my environment:

  • Apple M1 Max, 32 GB memory
  • Python 3.10.8
  • Packages
    • adlfs (2022.11.2)
    • duckdb (0.6.0)
    • fsspec (2022.11.0)
    • pyarrow (10.0.1)

@pdutta777
Copy link

After upgrading to duckdb 0.6.1, the program hangs even when using 1 parquet file

@github-actions
Copy link

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 30 days.

@github-actions github-actions bot added the stale label Jul 29, 2023
@github-actions
Copy link

This issue was closed because it has been stale for 30 days with no activity.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Aug 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants