In [31]:
import os
from pathlib import Path

os.environ["FLYTECTL_CONFIG"] = str(Path.home() / ".flyte/config-sandbox.yaml") 

In [32]:
%pip install -r ./requirements.txt "flytekit==1.14.0b6"

Note: you may need to restart the kernel to use updated packages.


In [33]:
import flytekit as fl
import sys

custom_image = fl.ImageSpec(
    name="async-testing",
    requirements="./requirements.txt",
    python_version=f"{sys.version_info.major}.{sys.version_info.minor}",
    registry="localhost:30000",#this is the container registry that ships with Flyte sandbox, but you can use any other
    platform="linux/arm64",
 )



In [34]:
import os
import platform
import subprocess
import typing
from pathlib import Path
import asyncio

from flytekit import task, workflow, ImageSpec, Resources, WorkflowFailurePolicy
from flytekit.core.utils import timeit
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory

In [35]:
# Enable debugging
envs = {}


mbytes_to_bytes = lambda x: x * 1024 * 1024


def fallocate(file_path: str, num_bytes: int):
    try:
        # Run the fallocate command to preallocate space to the file
        subprocess.run(["fallocate", "-l", str(num_bytes), file_path], check=True)
        print(f"Allocated {num_bytes} bytes to {file_path}")
    except subprocess.CalledProcessError as e:
        print(f"Error: {e}")


def dd(file_path: str, num_bytes: int):
    try:
        # Run the dd command to write to the file
        subprocess.run(["dd", "if=/dev/urandom", f"of={file_path}", f"bs={num_bytes}", f"count=1"], check=True)
        print(f"Wrote {num_bytes} bytes to {file_path}")
    except subprocess.CalledProcessError as e:
        print(f"Error: {e}")


@timeit("generate_files")
def generate_files(count: int, size_bytes: int, parent: Path) -> typing.List[Path]:
    if parent.exists() and not parent.is_dir():
        raise ValueError(f"{parent} is not a directory")
    parent.mkdir(parents=True, exist_ok=True)

    for i in range(count):
        p = parent / f"file_{i}.bin"
        if platform.system() == "Darwin":
            dd(str(p), size_bytes)
        else:
            fallocate(str(p), size_bytes)

    return [parent / f"file_{i}.bin" for i in range(count)]


def generate_and_upload_list(count: int, num_bytes: int) -> typing.List[FlyteFile]:
    parent = Path("/home/flytekit/temp/gen")
    files = generate_files(count, num_bytes, parent)
    return [FlyteFile(path=str(f)) for f in files]


@task(enable_deck=True, requests=Resources(cpu="1", mem="3Gi"))
def generate_list_async(count: int, num_m_bytes: int) -> typing.List[FlyteFile]:
    num_bytes = mbytes_to_bytes(num_m_bytes)
    return generate_and_upload_list(count=count, num_bytes=num_bytes)


@task(container_image=custom_image, enable_deck=True, requests=Resources(cpu="1", mem="3Gi"))
def generate_list_current(count: int, num_m_bytes: int) -> typing.List[FlyteFile]:
    num_bytes = mbytes_to_bytes(num_m_bytes)
    return generate_and_upload_list(count=count, num_bytes=num_bytes)


@task(enable_deck=True, requests=Resources(cpu="1", mem="3Gi"))
def generate_folder_async(count: int, num_m_bytes: int) -> FlyteDirectory:
    num_bytes = mbytes_to_bytes(num_m_bytes)
    files = generate_and_upload_list(count=count, num_bytes=num_bytes)
    parent_folder = str(Path(files[0].path).parent)
    return FlyteDirectory(path=parent_folder)


@task(container_image=custom_image, enable_deck=True, requests=Resources(cpu="1", mem="3Gi"))
def generate_folder_current(count: int, num_m_bytes: int) -> FlyteDirectory:
    num_bytes = mbytes_to_bytes(num_m_bytes)
    files = generate_and_upload_list(count=count, num_bytes=num_bytes)
    parent_folder = str(Path(files[0].path).parent)
    return FlyteDirectory(path=parent_folder)


async def download_files(ffs: typing.List[FlyteFile]):
    futures = [asyncio.create_task(ff._download()) for ff in ffs]
    return await asyncio.gather(*futures, return_exceptions=True)


@task(enable_deck=True)
def download_list_async(ffs: typing.List[FlyteFile]):
    from flytekit.utils.asyn import run_sync

    with timeit("download files"):
        run_sync(download_files, ffs)
    # for x in ffs:
    #     local = x.path
    #     print(f"Downloaded {x.remote_source} to {local} size {os.path.getsize(local)}")


# use newer image, but this is a dumb test so it's okay.
@task(enable_deck=True)
def download_list_serial(ffs: typing.List[FlyteFile]):
    for x in ffs:
        print(f"Downloading {x.remote_source} to {x.path}")
        x.download()


@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf(count: int = 1000, num_m_bytes: int = 2):
    generate_list_current(count=count, num_m_bytes=num_m_bytes)
    file_list = generate_list_async(count=count, num_m_bytes=num_m_bytes)
    generate_folder_current(count=count, num_m_bytes=num_m_bytes)
    generate_folder_async(count=count, num_m_bytes=num_m_bytes)
    download_list_async(ffs=file_list)
    download_list_serial(ffs=file_list)


In [36]:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config

remote = FlyteRemote(
        Config.for_sandbox(), 
        default_project="flytesnacks", 
        default_domain="development", 
        interactive_mode_enabled=True,  # Optional in notebooks - automatically enabled
    )
wf_exec = remote.execute(wf, inputs={}, wait=True)