Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to access ADLS folder (delta table storage) from DeltaTable method in deltalake package #392

Closed
prasadvaze opened this issue Aug 17, 2021 · 33 comments
Labels
enhancement New feature or request

Comments

@prasadvaze
Copy link

prasadvaze commented Aug 17, 2021

Description

Delta RUST API seems a good option to query delta table without spinning up spark cluster so I am trying out this - https://databricks.com/blog/2020/12/22/natively-query-your-delta-lake-with-scala-java-and-python.html using Python app
"Reading the Delta Table (Python)" section towards the end of this blog refers to below code snippet
'dt = DeltaTable("../rust/tests/data/simple_table")' It is not clear if this path assumes a local folder (that would be a problem because it means I need to download delta folder to local drive)?
My delta table is on ADLS path (azure data lake store) and I do not see a way to authenticate and connect to ADLS folder path and use it in above command ( https://github.com/delta-io/delta-rs)
Am I missing something basic here?
Use Case
Querying delta table from azure function app without spinning up spark cluster
Related Issue(s)
Not sure how to auth and connect to ADLS folder where delta table is stored from the deltalake library ( I can connect using blob client but then can not use it in the DeltaTable method in deltalake package) A better code example will help

@prasadvaze prasadvaze added the enhancement New feature or request label Aug 17, 2021
@prasadvaze
Copy link
Author

I think below should work but still confirming
dt = DeltaTable("abfss://FileSystemName@StorageAcctName.dfs.core.windows.net/rust/tests/data/simple_table)

@thedern
Copy link

thedern commented Aug 20, 2021

The instance of DeltaTable can be created with an 'abfs' file URI; however, the following error will occur when creating a pyarrow table via the to_pyarrow_table method:

dt = DeltaTable(d)
df = dt.to_pyarrow_table()
pyarrow.lib.ArrowInvalid: Unrecognized filesystem type in URI: abfss://mycontainer@mystorageacct.dfs.core.windows.net

I walked through the source code in the stack trace and I believe the ‘abfs’ protocol is the issue. The file system URI protocols supported by pyarrow are s3, hdfs, viewfs, and local files as well as a few others. The 'abfs' protocol does not appear to be supported and when the code separates the ‘file system’ from the file path, the error above is thrown.

Are there future plans for 'abfs' support?

@mattc-eostar
Copy link

mattc-eostar commented Aug 20, 2021

It may be that you will need to recreate the to_pyarrow_dataset method to use a file system object. Dask created one for Azure Data Lake. I think core issue is that container is needed in the path for the filesystem.

from typing import Optional, List, Tuple, Any
import adlfs
import os
from urllib.parse import urlparse
from deltalake import DeltaTable
import pyarrow
from pyarrow.dataset import dataset, partitioning

def to_pyarrow_dataset2(
        dt: DeltaTable, fs, container_name, partitions: Optional[List[Tuple[str, str, Any]]] = None
    ) -> pyarrow.dataset.Dataset:
        """
        Build a PyArrow Dataset using data from the DeltaTable.

        :param partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
        :return: the PyArrow dataset in PyArrow
        """
        if partitions is None:
            file_paths = dt.file_uris()
        else:
            file_paths = dt.files_by_partitions(partitions)
        paths = [urlparse(curr_file) for curr_file in file_paths]

        empty_delta_table = len(paths) == 0
        if empty_delta_table:
            return dataset(
                [],
                schema=dt.pyarrow_schema(),
                partitioning=partitioning(flavor="hive"),
            )

        # Decide based on the first file, if the file is on cloud storage or local
        if paths[0].netloc:
            query_str = ""
            # pyarrow doesn't properly support the AWS_ENDPOINT_URL environment variable
            # for non-AWS S3 like resources. This is a slight hack until such a
            # point when pyarrow learns about AWS_ENDPOINT_URL
            endpoint_url = os.environ.get("AWS_ENDPOINT_URL")
            if endpoint_url is not None:
                endpoint = urlparse(endpoint_url)
                # This format specific to the URL schema inference done inside
                # of pyarrow, consult their tests/dataset.py for examples
                query_str += (
                    f"?scheme={endpoint.scheme}&endpoint_override={endpoint.netloc}"
                )

            keys = [container_name+curr_file.path for curr_file in paths]
            return dataset(
                keys,
                schema=dt.pyarrow_schema(),
                filesystem=fs,
                partitioning=partitioning(flavor="hive"),
            )
        else:
            return dataset(
                file_paths,
                schema=dt.pyarrow_schema(),
                format="parquet",
                partitioning=partitioning(flavor="hive"),
            )
        
storage_options = {
    'account_name': '', 
    'account_key': ''
}

fs = adlfs.AzureBlobFileSystem(**storage_options)

df = to_pyarrow_dataset2(dt, fs, 'container_name').to_table().to_pandas()

@mattc-eostar
Copy link

mattc-eostar commented Aug 20, 2021

If I have some extra time in the next week, I can perhaps put together a PR that handles this use case. I think it would make it easier to support other FS as well. Or even if the FS object is passed to the main delta table initialization, then maybe it could be passed along as needed into any of the methods. Thoughts?

Edit: would need to handle the generic fsspec object specification and then anything that adheres to that could be passed in.

@mattc-eostar
Copy link

The code needs to account for container name. When fs.open() is called and the path of one of the files is passed in in pyarrow.dataset, it fails because, for adlfs, you need to prepend the path with the container name even though it is specified in the URI...sorta weird, but the code above does that prepend.

@thedern
Copy link

thedern commented Aug 24, 2021

Thanks for the sample code. I implemented it and it did indeed create the data frame.

@prasadvaze
Copy link
Author

@mattc-eostar
Matthew , thanks for posting the code for to_pyarrow_dataset2( ) method. We (me and darren who are working on this issue) appreciate your help. As Darren mentioned we are now able to use DeltaTable method in RUST deltalake package (crate) to query delta table stored in ADLS.
We kind of understand the difference between your to_pyarrow_dataset2( ) and the original to_pyarrow_dataset( ) method in deltalake package but don't understand your earlier comment
"If I have some extra time in the next week, I can perhaps put together a PR that handles this use case. I think it would make it easier to support other FS as well. Or even if the FS object is passed to the main delta table initialization, then maybe it could be passed along as needed into any of the methods. Thoughts? Edit: would need to handle the generic fsspec object specification and then anything that adheres to that could be passed in."

Are you looking for our opinion or feedback?

@mattc-eostar
Copy link

@thedern glad to hear it helped. definitely hacky for now, but it works!

@prasadvaze not sure if and when I will have the time to submit a PR. I think that it could be tricky/confusing to implement since the package requires environment variables to be set and this hack requires the use of a filesystem object. I think it would be best for the package to support file system objects in general and then this falls right into place.

the container name issue is also unique to azure it seems, or at the very least, to that specific FSSPEC implementation by Dask of adlfs.

@mattc-eostar
Copy link

mattc-eostar commented Aug 24, 2021

This is the key difference. Need to prepend container name and pass in a FS object from adlfs rather than a URI

            keys = [container_name+curr_file.path for curr_file in paths]
            return dataset(
                keys,
                schema=dt.pyarrow_schema(),
                filesystem=fs,
                partitioning=partitioning(flavor="hive"),
            )

It would be piggybacking on how pyarrow accomplishes working with FS objects: https://arrow.apache.org/docs/python/filesystems.html

This library could implement and maintain its own versions of fsspec objects for aws, gcp, and azure to avoid inconsistencies in behavior. For example, https://github.com/dask/adlfs

When calling fs.open() using adlfs you need to prepend the path to the file using the container name instead of passing in a container name at instantiation and using that for subsequent calls. If deltalake were to have its own internal impl of these file systems then that could be controlled. That being said, I think the issue is that when the Deltatable object is created and you get the underlying file paths, those paths do not include the container name when pulled from azure. So either that needs to change or adlfs needs to change to make this work without the hack I put together.

Hopefully my ramblings make sense!

@thedern
Copy link

thedern commented Aug 24, 2021

I can confirm that the file paths returned do not have the container name associated with them, just what amounts to a 'relative' file path is passed back. Prepending the container name solved the issue. Thanks again.

@mattc-eostar
Copy link

mattc-eostar commented Aug 24, 2021

@thedern when you say solved, does that mean a PR is planned?

also, does that mean a filesystem object is not necessary as long as the container name is given? that would make the change much simpler.

@thedern
Copy link

thedern commented Aug 24, 2021

@mattc-eostar, I have not planned a PR at this time. Using the workaround you provided where the container name is prepended to the file path worked for us in our testing. We did not address the filesystem object, just tried your container name insert.

@prasadvaze
Copy link
Author

prasadvaze commented Aug 30, 2021

@fvaleye and @houqp Thanks so much for updating original method.
Either @thedern or I will try and confirm if it works as expected.
@mattc-eostar Thanks for workaround in the meantime

@prasadvaze
Copy link
Author

Using deltalake.DeltaTable (RUST impl) on ubuntu we used blobfuse instead of adlfs to connect and mount the ADLS. So we bypassed to_pyarrow_dataset( ) method while creating dataframe from delta table. On windows we have to use adlfs and then try the new to_pyarrow_dataset( ) method updated by @fvaleye. we will try that soon
Querying ~50k rows delta table using blobfuse took ~2sec while same thing took ~6sec using adlfs.

@devilaadi
Copy link

Hi @mattc-eostar ,

we are getting the error 'str' object has no attribute 'file_uris' when using your to_pyarrow_dataset2 function.
df = to_pyarrow_dataset2('sales_product', fs, 'shared').to_table().to_pandas()

sales_product is my table name and shared is my container name

@mattc-eostar
Copy link

mattc-eostar commented Sep 6, 2021 via email

@houqp
Copy link
Member

houqp commented Sep 7, 2021

The object uris returned by the Rust core should contain the full uri including the container name for ADLS, see the file_paths variable at:

paths = [urlparse(curr_file) for curr_file in file_paths]

I believe the problem is due to we passing those paths to urlparse and only referenced the relative path component of the uri later when we construct the object keys:

keys = [curr_file.path for curr_file in paths]

We shouldn't need to pass in the container name as extra argument because that should already be part of the uris returned by the Rust core. If we have to make a special case for ADSL in the to_arrow_dataset method, I am fine with that. I think we already have a special case for S3 right now.

@devilaadi
Copy link

devilaadi commented Sep 7, 2021

Please take a look at the function definition. You have to pass it a Deltatable object that points to your delta table in your storage account. You cannot just pass it the name of the table. You may be able to modify the function to do what you need though.

________________________________ From: devilaadi @.> Sent: Monday, September 6, 2021 9:19:04 AM To: delta-io/delta-rs @.> Cc: Matt Conflitti @.>; Mention @.> Subject: Re: [delta-io/delta-rs] How to access ADLS folder (delta table storage) from DeltaTable method in deltalake package (#392) EXTERNAL EMAIL: Use caution with links and attachments. Hi @mattc-eostarhttps://github.com/mattc-eostar , we are getting the error 'str' object has no attribute 'file_uris' when using your to_pyarrow_dataset2 function. df = to_pyarrow_dataset2('sales_product', fs, 'shared').to_table().to_pandas() sales_product is my table name and shared is my container name — You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub<#392 (comment)>, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AMXBD7HQ6TSOQTYFSRZJ3SDUAS5URANCNFSM5CKIU5UQ. Triage notifications on the go with GitHub Mobile for iOShttps://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Androidhttps://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

@mattc-eostar Thanks for the reply , it is working now with your function "to_pyarrow_dataset2" .

@mattc-eostar
Copy link

@devilaadi Check out the docs here.

dt = DeltaTable("abfss://FileSystemName@StorageAcctName.dfs.core.windows.net/rust/tests/data/simple_table)

@mattc-eostar
Copy link

The object uris returned by the Rust core should contain the full uri including the container name for ADLS, see the file_paths variable at:

paths = [urlparse(curr_file) for curr_file in file_paths]

I believe the problem is due to we passing those paths to urlparse and only referenced the relative path component of the uri later when we construct the object keys:

keys = [curr_file.path for curr_file in paths]

We shouldn't need to pass in the container name as extra argument because that should already be part of the uris returned by the Rust core. If we have to make a special case for ADSL in the to_arrow_dataset method, I am fine with that. I think we already have a special case for S3 right now.

I think that makes sense. And then if it becomes a pain to keep adding/maintaining new special cases, it could always be abstracted into a more robust cloud provider handler class or something.

As long as the information is available already within the class, then it the container parameter could be removed immediately. The file system object needs to remain unless that could be passed in upon creation of the delta table since it is sort of a one-to-one thing anyways.

@houqp
Copy link
Member

houqp commented Sep 8, 2021

Does the ADLS filesystem library support setting container name on creation? I am wondering if that will make it able to handle paths without container names. This is how things work for other backends, i.e. we initiate the filesystem and table root path, then pass in relative paths for each files.

@mattc-eostar
Copy link

Does the ADLS filesystem library support setting container name on creation? I am wondering if that will make it able to handle paths without container names. This is how things work for other backends, i.e. we initiate the filesystem and table root path, then pass in relative paths for each files.

I am not sure that this particular implementation allows for that. But could be wrong.

@houqp
Copy link
Member

houqp commented Oct 21, 2021

can anyone build the latest python binding from source and give it a try? @roeap introduced a big improvement to reuse the native rust storage backend in python.

@francisco-ltech
Copy link

Is anyone having the same issue but in the rust package?

abfss://container@storagename.dfs.core.windows.net/folder/delta-0.8.0

Error: Failed to read delta log object: Invalid object URI

Caused by:
0: Invalid object URI
1: Invalid URI scheme: abfss

@thovoll
Copy link
Contributor

thovoll commented Jan 31, 2022

Is anyone having the same issue but in the rust package?

abfss://container@storagename.dfs.core.windows.net/folder/delta-0.8.0

Error: Failed to read delta log object: Invalid object URI

Caused by: 0: Invalid object URI 1: Invalid URI scheme: abfss

We switched from abfss:// to adls2:// in #499. Can you give more context on when you get this error?

@francisco-ltech
Copy link

@thovoll Thanks for the pointer.

I have set these two ENV VARS:

  • AZURE_STORAGE_ACCOUNT_NAME=myaccountname.dfs.core.windows.net
  • AZURE_STORAGE_ACCOUNT_KEY=mykey

Then:

let account = std::env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap();
    let table = deltalake::open_table(format!("adls2://{}/container/folder/delta-0.8.0/", account).as_str())
            .await
            .unwrap();

Error:
thread 'main' panicked at 'called Result::unwrap() on an Err value: StorageError { source: Uri { source: InvalidScheme("adls2") } }'

Are paths not supported or am I missing something obvious?

Thanks!

@thovoll
Copy link
Contributor

thovoll commented Jan 31, 2022

Try AZURE_STORAGE_ACCOUNT_NAME=myaccountname, not the full URL

@roeap
Copy link
Collaborator

roeap commented Jan 31, 2022

As @thovoll points out, it look like the error you see is related to the scheme being changed, and I agree with the account var only referring to the account.

@thovoll - while abfss is technically referring to something we are not using here, do you thing we should maybe support both schemes for backwards compatibility?

@francisco-ltech
Copy link

@thovoll Same error even with myaccountname only (not full url).

Looking at the code base, I think this is where the storagename is evaluated and wondering if I'm missing something in my code to indicate the use of feature=azure?

"adls2" => {
            cfg_if::cfg_if! {
                if #[cfg(feature = "azure")] {
                    let mut path_parts = parts[1].splitn(3, '/');
                    let account_name = match path_parts.next() {
                        Some(x) => x,
                        None => {
                            return Err(UriError::MissingObjectAccount);
                        }
                    };
                    let file_system = match path_parts.next() {
                        Some(x) => x,
                        None => {
                            return Err(UriError::MissingObjectFileSystem);
                        }
                    };
                    let path = match path_parts.next() {
                        Some(x) => x,
                        None => {
                            return Err(UriError::MissingObjectPath);
                        }
                    };

                    Ok(Uri::AdlsGen2Object(azure::AdlsGen2Object { account_name, file_system, path }))
                } else {
                    Err(UriError::InvalidScheme(String::from(parts[0])))
                }
            }
        }

@roeap
Copy link
Collaborator

roeap commented Jan 31, 2022

@francisco-ltech - you are correct. For some reason I was thinking about the python bindings, where azure is enabled by default. You need to activate the azure feature for azure to work when working with the rust crate directly.

Also, as of now you need to use a git reference in your Cargo.toml when using azure. Sine the crates were not published to crates.io yet, they are not available in the published deltalake crate. Although we'll be able to fix that quite soon, since the azure crates are now being released. However there are a few features still missing we'd like to use here.

deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "b713c4232b04de44658c9946734a78b170857603", features = [
  "azure",
]}

@francisco-ltech
Copy link

@roeap - That makes sense now, thank you!

@thovoll
Copy link
Contributor

thovoll commented Jan 31, 2022

@thovoll - while abfss is technically referring to something we are not using here, do you thing we should maybe support both schemes for backwards compatibility?

I'd rather see if adls2 sticks, version 0.x is the time to figure this out :)

@roeap
Copy link
Collaborator

roeap commented May 15, 2022

this issue is open for quite a while, and I have successfully been using the Azure backend in multiple scenarios. Also, a lot of work has been done on the azure side since then. Closing this for now, but feel free to re-open, if this is still relevant.

@roeap roeap closed this as completed May 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

8 participants