Skip to content

Conversation

@alexkruc
Copy link
Contributor

@alexkruc alexkruc commented Oct 5, 2022

This PR is adding the ability for the download_file method to keep the original file name of the file as it is on S3.

Currently, the method generates a hardcoded temp file, as shown here:

with NamedTemporaryFile(dir=local_path, prefix='airflow_tmp_', delete=False) as local_tmp_file:
s3_obj.download_fileobj(

As the original ticket states, sometimes users need to keep the file name as it is on S3. To accomplish this, I've added the parameter preserve_file_name, which if set to True, will rename the temporary file name to the name of the S3 object that we downloaded.

The default behavior is set to False - meaning to keep and use the generated file, for keeping the same API and not making a breaking change.

closes: #23514
related: #23654 (closed, but tries to implement the same with a different way)


Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution! I left a wording nit-pick and a suggestion for a refactor. Feel free to push back on the latter ;)

)

return local_tmp_file.name
if preserve_file_name:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels to me like the conditional should happen earlier, before the temporary file name is created, it's a bit strange to change the name afterwards when we could just download it under the correct name (whether temp or not) in the first place.

This would require a greater refactoring of this code though.

Copy link
Contributor Author

@alexkruc alexkruc Oct 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point, I've refactored the code accordingly, and now we are not renaming but writing to the proper file.
I'll appreciate another review if possible on the new implementation :) 🙏

Comment on lines 923 to 930
filename_in_s3 = s3_obj.key.split('/')[-1]
local_folder_name = local_tmp_file.name.rsplit('/', 1)[0]
local_file_name = f"{local_folder_name}/{filename_in_s3}"

self.log.info("Renaming file from %s to %s", local_tmp_file.name, local_file_name)
rename(local_tmp_file.name, local_file_name)

return local_file_name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
filename_in_s3 = s3_obj.key.split('/')[-1]
local_folder_name = local_tmp_file.name.rsplit('/', 1)[0]
local_file_name = f"{local_folder_name}/{filename_in_s3}"
self.log.info("Renaming file from %s to %s", local_tmp_file.name, local_file_name)
rename(local_tmp_file.name, local_file_name)
return local_file_name
filename_in_s3 = s3_obj.key.rsplit('/', 1)[-1]
local_tmp_file_name = Path(local_tmp_file.name)
local_file_name = local_tmp_file_name.with_name(filename_in_s3)
self.log.info("Renaming file from %s to %s", local_tmp_file_name, local_file_name)
local_tmp_file_name.rename(local_file_name)
return str(local_file_name)

Don’t do string manipulation on the path, use structured calls instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comment, but I've refactored the code according to @o-nikolas's suggestion, so now this is not relevant :(

@uranusjr - Can you please review the new implementation? 🙏 It's a bit simpler, and I made sure that I'm using structured calls for paths.


with NamedTemporaryFile(dir=local_path, prefix='airflow_tmp_', delete=False) as local_tmp_file:
if preserve_file_name:
local_dir = local_path if local_path else gettempdir()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m a bit worried that using the temp dir directly with a predictive file name may cause a vulnarability. I don’t have concrete examples, but the combination is sort of a red flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYM?
I've tested it locally in Airflow docker and in Breeze, and the gettempdir() method retunes /tmp on all Linux envs..
When the dir parameter is not provided to the NamedTemporaryFile, it also called directly:
https://github.com/python/cpython/blob/0d68879104dfb392d31e52e25dcb0661801a0249/Lib/tempfile.py#L126

I do not quite understand why it may cause a vulnerability.

Do you think it's better to stay with the older implementation of renaming the file after it's already been created? I think that this way is a bit cleaner, but I'm also ok with also "reverting" to the old flow..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, in all flows, the user can still have a file kept in S3 with a name that can cause vulnerability and later saved in the temp directory that will be generated using the same function (if we don't provide a local_path)..

This is not different, it means that we can't implement this feature at all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow up on what @uranusjr 's thought:

Say we are using LocalExecutor or CeleryExecutor, so two users' jobs can be executed on the same host.

Here you are having filename_in_s3 = s3_obj.key.rsplit('/', 1)[-1]. So if user A is having file .../A/data.json and user B is having .../B/data.json, there may be conflict, right?

But just a vague thinking and very likely I missed something. Please feel free to point out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The risk can be greatly reduced if the file is put in a subdirectory instead of directly inside the temp directory root (so the full path the file is downloaded to remains unpredictable), but that may lead to additional cleanup issues since directories are more finicky than files. I’d be happy tif it works though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do also like option 1, it solves many of the complexities you pointed out (or at least bubbles them up to the user) and also allows the user to create a path that is predictable, so this is probably my preference. But they should be able to provide a full sub path within tmp so that they can organize files with similar names to their preference.

Although, option 2 would be perfectly serviceable as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, I did 2 things:

  • Added a check if the file exists before re-writing it, failing the task if it already exists to bubble the issue to the user.
  • Added another parameter, use_autogenerated_subdir, that is True by default, which creates a new sub-directory. The user can disable it to control the target file location, but it's on by default.

@o-nikolas @uranusjr @XD-DENG Will appreciate your review of the latest additions to this flow 🙏

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @alexkruc, thanks for sticking with this! The method stub is a little complicated now, but I think it's a decent middle ground given all the constraints that came up in the discussions here 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@o-nikolas Thanks!
It seems like this PR is beginning to be a bit stale.. Do you think we should do anything else? Or is this ok to approve and merge this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @alexkruc,

I think this is good to merge, but unfortunately I'm not a committer. CCing @eladkal

@Taragolis
Copy link
Contributor

IMHO, I thought that this method create more problem rather than solve (also have issue with SSE Encryption #25835), and seem like it main purpose for use in code PythonOperator (and so on), only one operator use this method - S3ToMySqlOperator.

Also it have the same name as boto3.client("s3").download_file but provide different input arguments, and all other community operators use this boto3 method by S3Hook(...).get_conn().download_file(...) or same method from high-level S3 Resource

@eladkal
Copy link
Contributor

eladkal commented Oct 6, 2022

only one operator use this method

This is hook capability which means it can be used by many operators. Check the user issue the use case was to use custom operator with it.

if we are taking the deprecation path we need to review other functions in the hook (I guess same question can be raised for other functions as well)

@Taragolis
Copy link
Contributor

Taragolis commented Oct 6, 2022

This is hook capability which means it can be used by many operators. Check the user issue the use case was to use custom operator with it.

Most of the user issue would fix if they avoid to use this method and just switch to method provided by boto3 and do whatever they wanted.

Because in this case S3Hook would use only for authentication and creation client. All other stuff they can read in boto3 documentation, rather than also try to understand why it won't work in this method and why it work different rather than download_file

Again it just my IMHO, but I want to highlight that might this is a reason why other community provided operators in general try to avoid use this method.

And another IMHO, better just warn user (UserWarning, docstring, provider documentation) there is another method to achieve everything they wanted in their custom components rather than struggling with this method.

@o-nikolas
Copy link
Contributor

This is hook capability which means it can be used by many operators. Check the user issue the use case was to use custom operator with it.

Most of the user issue would fix if they avoid to use this method and just switch to method provided by boto3 and do whatever they wanted.

Because in this case S3Hook would use only for authentication and creation client. All other stuff they can read in boto3 documentation, rather than also try to understand why it won't work in this method and why it work different rather than download_file

Yeah, this is a constant balance in Airflow provider packages, the question of "when is a hook/operator adding enough value over just using the underlying service sdk". In this case the user is getting some value from the file/location creation so it's a hard call.

@Taragolis
Copy link
Contributor

I do not have any objections about this PR. My messages more like discussion and ideas not related to current PR.

My main objection about method S3Hook.download_file if it would be initially something like

    @provide_bucket_name
    @unify_bucket_name_and_key
    def download_file(
        self,
        key: str,
        bucket_name: str | None = None, 
        filename: str | None = None, 
        extra_args=None, 
        config=None
    ) -> None:
        self.conn.download_file(
            bucket_name, key, filename, ExtraArgs=extra_args, Callback=None, Config=config
        )

Then we do not even have this discussion and modification not required because it just would be simple thin wrapper around one boto3 s3 client method which resolved different combination of key and bucket name.

But we have what we have and we cannot revert time back. So it might a lot of user code exists which use this method, how users use current method and what they additionally wanted. I could just predict:

  • User want to save to location which mount to persistent storage (NFS Share) and also want to keep prefix not only filename
  • User want to check is this file already exists in File System and raise an error
  • User want to add Object Version to the filename
  • User want to delete file in the end of the execute() method of his operator / callable function

My current idea - just inform (at least in docstring) that might exists better way which could help to solve their cases rather than try to use this method.

Also for better user experience we could create some additional static method in hook which could be use for split s3 URL to the parts, it might help them to do some specific stuff without write some additional code.

Sample of helper

from typing import NamedTuple
from urllib.parse import urlparse


class S3UrlParts(NamedTuple):
    bucket: str
    key: str | None
    prefix: str | None
    parts: list[str]
    filename: str | None
    basename: str | None
    suffix: str | None
    suffixes: list[str]


def parse_url(url: str):
    if "://" not in url:
        raise ValueError(f"Invalid S3 URL: {url!r}")
    url_parts = urlparse(url)
    if url_parts.scheme != "s3":
        # Yeah, I know that about 4 ways to define S3 URL and not limited by s3://bucket/prefix/key.json
        raise ValueError(f"Invalid S3 URL (incorrect schema): {url!r}")
    elif not url_parts.netloc:
        raise ValueError(f"Invalid S3 URL (no bucket name): {url!r}")

    bucket = url_parts.netloc
    key = url_parts.path.lstrip("/")
    key_parts = key.rsplit("/", 1)
    if len(key_parts) == 2:
        prefix, filename = key_parts
    else:
        prefix, filename = None, key_parts[0]

    parts = []
    basename = None
    suffix = None
    suffixes = []
    if prefix:
        parts.extend(prefix.split("/"))
    if filename:
        parts.append(filename)

        file_parts = filename.split(".", 1)
        if len(file_parts) == 2:
            basename, suffix = file_parts
            suffixes = suffix.split(".")
        else:
            basename, suffix = file_parts[0], None

    return S3UrlParts(
        bucket,
        key=key,
        prefix=prefix or None,
        parts=parts,
        filename=filename or None,
        basename=basename,
        suffix=suffix,
        suffixes=suffixes
    )

Sample of "user" code

s3_urls = [
    "s3://awesome-bucket/this/is/path/to/awesome-data.parquet.snappy",
    "s3://awesome-bucket",
    "s3://awesome-bucket/",
    "s3://awesome-bucket/prefix/",
    "s3://awesome-bucket/key.json",
    "s3://awesome-bucket/key-with-no-extension",
    "s3://awesome-bucket/key-with-dot-in-the-end.",
]

for s3_url in s3_urls:
    print(f"S3 URL: {s3_url}")
    print(f"Parsed URL: {parse_url(s3_url)}")
    print()

Sample Output

S3 URL: s3://awesome-bucket/this/is/path/to/awesome-data.parquet.snappy
S3UrlParts(bucket='awesome-bucket', key='this/is/path/to/awesome-data.parquet.snappy', prefix='this/is/path/to', parts=['this', 'is', 'path', 'to', 'awesome-data.parquet.snappy'], filename='awesome-data.parquet.snappy', basename='awesome-data', suffix='parquet.snappy', suffixes=['parquet', 'snappy'])

S3 URL: s3://awesome-bucket
S3UrlParts(bucket='awesome-bucket', key='', prefix=None, parts=[], filename=None, basename=None, suffix=None, suffixes=[])

S3 URL: s3://awesome-bucket/
S3UrlParts(bucket='awesome-bucket', key='', prefix=None, parts=[], filename=None, basename=None, suffix=None, suffixes=[])

S3 URL: s3://awesome-bucket/prefix/
S3UrlParts(bucket='awesome-bucket', key='prefix/', prefix='prefix', parts=['prefix'], filename=None, basename=None, suffix=None, suffixes=[])

S3 URL: s3://awesome-bucket/key.json
S3UrlParts(bucket='awesome-bucket', key='key.json', prefix=None, parts=['key.json'], filename='key.json', basename='key', suffix='json', suffixes=['json'])

S3 URL: s3://awesome-bucket/key-with-no-extension
S3UrlParts(bucket='awesome-bucket', key='key-with-no-extension', prefix=None, parts=['key-with-no-extension'], filename='key-with-no-extension', basename='key-with-no-extension', suffix=None, suffixes=[])

S3 URL: s3://awesome-bucket/key-with-dot-in-the-end.
S3UrlParts(bucket='awesome-bucket', key='key-with-dot-in-the-end.', prefix=None, parts=['key-with-dot-in-the-end.'], filename='key-with-dot-in-the-end.', basename='key-with-dot-in-the-end', suffix='', suffixes=[''])

@alexkruc alexkruc requested a review from eladkal as a code owner October 11, 2022 09:03
:return: the file name.
:rtype: str
"""
self.log.info(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Taragolis I've added a log message here to show that this function shadows boto's method, hope that's fine :)

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr @XD-DENG any further comments?

@eladkal eladkal requested review from XD-DENG and uranusjr October 16, 2022 07:44
@alexkruc alexkruc requested review from uranusjr and removed request for XD-DENG October 18, 2022 11:16
@eladkal
Copy link
Contributor

eladkal commented Oct 24, 2022

If no further concerns I'll merge this PR so it will be available in next provider wave.
@alexkruc can you please rebase and resolve conflicts?

@potiuk
Copy link
Member

potiuk commented Oct 24, 2022

Unfortunately it needs rebase/conflict resolution after the strring normalization @alexkruc

@alexkruc
Copy link
Contributor Author

Hey @eladkal and @potiuk - I've rebased, and after the fix in #27258 (thanks for this! it drove me crazy yesterday 😄) the CI tests are passing.
So can we merge it? 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Json files from S3 downloading as text files

8 participants