Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download specific files in FlyteDirectory #2059

Merged
merged 4 commits into from
Jan 3, 2024
Merged

Download specific files in FlyteDirectory #2059

merged 4 commits into from
Jan 3, 2024

Conversation

pingsutw
Copy link
Member

Tracking issue

NA

Why are the changes needed?

flytekit downloads the entire directory when calling os.listdir(FlyteDirectory(...)). However, users may only want to download the specific files they will use when the task is running.

What changes were proposed in this pull request?

Add a new method called listdir in FlyteDirectory that can list all the files and folders in the given directory. listdir only returns either flytefile or FlyteDirectory

How was this patch tested?

import os
import tempfile

from flytekit import workflow, task, ImageSpec
from flytekit.types.directory import FlyteDirectory

new_flytekit = "git+https://github.com/flyteorg/flytekit.git@a5171380ea8a1aa55e20647a06e4313cbc19bec4"
image_spec = ImageSpec(registry="pingsutw", apt_packages=["git"], packages=[new_flytekit])


@task(container_image=image_spec)
def t1() -> FlyteDirectory:
    temp_dir = tempfile.mkdtemp(prefix='temp_example_')

    # Create some sample files in the temporary directory
    file1_path = os.path.join(temp_dir, 'file1.txt')
    with open(file1_path, 'w') as file1:
        file1.write('Content of file1.txt')

    file2_path = os.path.join(temp_dir, 'file2.txt')
    with open(file2_path, 'w') as file2:
        file2.write('Content of file2.txt')

    sub_dir = os.path.join(temp_dir, 'sub_dir')
    os.mkdir(sub_dir)
    file3_path = os.path.join(sub_dir, 'file3.txt')
    with open(file3_path, 'w') as file3:
        file3.write('Content of file3.txt')

    remote_dir = FlyteContext.current_context().file_access.get_random_remote_directory()

    return FlyteDirectory(path=temp_dir, remote_directory=remote_dir)


@task(container_image=image_spec)
def t2(directory: FlyteDirectory):
    entity = FlyteDirectory.listdir(directory)
    for e in entity:
        print("s3 object:", e.remote_source)
        # s3 object: s3://test-flytedir/file1.txt
        # s3 object: s3://test-flytedir/file2.txt
        # s3 object: s3://test-flytedir/sub_dir

    f = open(entity[0], "r")
    print(f.read())
    # Download s3 file to local disk
    # Getting s3://test-flytedir/file1.txt to /var/folders/xp/_4ltv_bx3pb_r0bm00fk9z_00000gn/T/flyte-ed16jhdg/sandbox/local_flytekit/6efb309606124d7104991164a0503671
    # Content of file1.txt

    f = open(entity[0], "r")
    print(f.read())
    # Read from local disk
    # Content of file1.txt


@workflow
def wf():
    t2(directory=t1())


if __name__ == '__main__':
    wf()

Screenshots

image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed off.

Related PRs

NA

Docs link

NA

Signed-off-by: Kevin Su <pingsutw@apache.org>
Copy link

codecov bot commented Dec 16, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (d295000) 85.98% compared to head (d4a6444) 86.03%.
Report is 4 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2059      +/-   ##
==========================================
+ Coverage   85.98%   86.03%   +0.05%     
==========================================
  Files         308      308              
  Lines       23016    23051      +35     
  Branches     3479     3489      +10     
==========================================
+ Hits        19790    19832      +42     
+ Misses       2616     2611       -5     
+ Partials      610      608       -2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw pingsutw self-assigned this Dec 16, 2023
Signed-off-by: Kevin Su <pingsutw@apache.org>
@kumare3
Copy link
Contributor

kumare3 commented Jan 2, 2024

you should also be able to do this using flytedirectory.crawl()...

@@ -236,6 +236,66 @@ def new_dir(self, name: typing.Optional[str] = None) -> FlyteDirectory:
def download(self) -> str:
return self.__fspath__()

@classmethod
def listdir(cls, directory: FlyteDirectory) -> typing.List[typing.Union[FlyteDirectory, FlyteFile]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not make this as an instance method on FlyteDirectory instead of class method?
Also i want to support async versions of all these methods

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add them to a separate PR.

@pingsutw
Copy link
Member Author

pingsutw commented Jan 3, 2024

you should also be able to do this using flytedirectory.crawl()...

it won't return flytefile.

@pingsutw pingsutw merged commit 6f613e7 into master Jan 3, 2024
75 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants