Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes/improvements to coiled_flow #4

Closed
scharlottej13 opened this issue Jan 5, 2023 · 4 comments
Closed

Fixes/improvements to coiled_flow #4

scharlottej13 opened this issue Jan 5, 2023 · 4 comments

Comments

@scharlottej13
Copy link
Contributor

I was working through running this flow as part of https://github.com/coiled/platform/issues/294 and noticed a few things (in order of importance). Thanks for putting this flow together @hayesgb and for working together on this, otherwise I would not have noticed any of these. To be clear, I'm happy to open up PRs for any/all of these.

  1. using dask.distributed.get_client puts most of the work on a single worker
    screencast of scheduler dashboardhttps://user-images.githubusercontent.com/8620816/210840205-61108962-0e82-45d3-ba5a-a8a80e1ec770.mov
    The fix to this would be to use prefect_dask.get_dask_client, however, this no longer works! The fix instead is to use dask.distributed.get_worker_client(separate_thread=False), which is essentially what get_dask_client does in Add get_dask_client PrefectHQ/prefect-dask#33.

    Furthermore, though I was able to use get_worker_client(separate_thread=False) in a minimal reproducer, I'm still working on ensuring it works for flows/coiled_flow.py; the write seems slower than I'd expect:

    dd.to_parquet(
    ddf,
    fpath,
    storage_options=storage_options,
    name_function=name_func,
    )

    All this to say, I'm still working on disentangling what's going on here, and will open a follow-up issue in the appropriate repo.

minimal repro w/ worker_client
import dask
from dask.distributed import worker_client
from prefect import flow, task
from prefect_dask import DaskTaskRunner

coiled_runner = DaskTaskRunner(
    cluster_class="coiled.Cluster",
    cluster_kwargs={
        "n_workers": 4,
        "package_sync": True,
        "name": "worker-client-prefect",
    },
)


@task
def calling_compute_in_a_task():
    with worker_client(separate_thread=False) as client:
        df = dask.datasets.timeseries("2000", "2005", partition_freq="2w")
        summary_df = df.describe()
        client.compute(summary_df)


@flow(task_runner=coiled_runner)
def test_flow():
    calling_compute_in_a_task.submit()


if __name__ == "__main__":
    test_flow()
  1. the s3 bucket names are hard-coded and not public, is this intentional?

  2. We shouldn't need a Prefect Block for the AWS credentials-- Coiled automatically creates + sends an STS token to the cluster. I also think this is more secure than grabbing it from a Prefect Block, as this returns a string:

    "secret": creds.aws_secret_access_key.get_secret_value(),

@hayesgb
Copy link
Contributor

hayesgb commented Jan 5, 2023

Thanks for digging in @scharlottej13.

What you're observing with the different options to get_client is consistent with my experience, which was why I switched to a dask native approach. I did not notice an issue with tasks being unevenly distributed, so can't speak to that.

Bucket names are hard-coded b/c this is intended (long-term) to be part of a pipeline that can used for incremental processing. It would be fine for them to be public, read-only. I'm using the output of this workflow here.

Agree about the Block, but when you run this from a Prefect Agent, the user credentials are not present to create the STS token.

@scharlottej13
Copy link
Contributor Author

What you're observing with the different options to get_client is consistent with my experience, which was why I switched to a dask native approach. I did not notice an issue with tasks being unevenly distributed, so can't speak to that.

Ah ok, I hadn't realized get_dask_client wasn't working for you (sorry if I misunderstood this when we last chatted).

Bucket names are hard-coded b/c this is intended (long-term) to be part of a pipeline that can used for incremental processing. It would be fine for them to be public, read-only. I'm using the output of this workflow here.

Thx for the explanation, that's cool that it will feed into the xgboost example + post too.

Agree about the Block, but when you run this from a Prefect Agent, the user credentials are not present to create the STS token.

Oh right. I'm curious if there is a way to make this work...

@hayesgb
Copy link
Contributor

hayesgb commented Jan 6, 2023

Depends heavily on how the Agent is provisioned. If its an isolated machine using a single users credentials, it would be fine, but that doesn't work well in a production environment as the # of jobs increases. The more realistic use case is multiple agents with different associated credentials, possibly running on in pods on a K8 cluster (based on what I saw when I started looking at agent deployment methods).

@scharlottej13
Copy link
Contributor Author

Closing this for now, opened an issue for get_dask_client PrefectHQ/prefect#12971

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants