Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

It's unclear how/where I should configure job concurrency in Dagster OSS based on the documentation #19402

Open
JasperHG90 opened this issue Jan 25, 2024 · 3 comments
Labels
area: concurrency Related to controlling concurrent execution

Comments

@JasperHG90
Copy link

What's the issue or suggestion?

Situation
I have deployed dagster OS to better understand the deployment process. I have separate deployments for the dagster daemon & server and my code locations.

I am deploying a dag with a single partitioned asset.

What I want to do
I want to control the concurrency of the job to which the asset belongs, ideally on a code-location basis.

Documentation I'm following
I'm following the article "Limiting concurrency in data pipelines". I'm having a hard time understanding exactly where I need to specify limits on the concurrency of my job.

For example, the following section:

image

This makes it seem as if you can control the concurrency on a code-location basis using the dagster.yaml file. While this works when setting the DAGSTER_HOME environment variable and executing dagster dev, you cannot use this when deploying your code location (which calls dagster api grpc). See also this issue.

After some searching, I found that the helm values.yaml file contains the same section as mentioned in the docs. This suggests that, when using dagster OS, you need to place the concurrency limits here.

It would be great if the docs were clearer on this.

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@JasperHG90 JasperHG90 added the area: docs Related to documentation in general label Jan 25, 2024
@sam-goodwin
Copy link

I am following the docs precisely and getting errors. It's very confusing to understand.

my_job = define_asset_job(
    name="my_jb",
    partitions_def=scan_id,
    executor_def=multiprocess_executor,
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "tag_concurrency_limits": [
                        {
                            "key": "yarn",
                            "value": "dask",
                            "limit": 3,
                        }
                    ],
                },
            }
        }
    },
)

Error:

Backfill failed for jnpeozgr: dagster._core.errors.DagsterInvalidConfigError: Error in config for job orion_human
    Error 1: Received unexpected config entry "multiprocess" at path root:execution:config. Expected: "{ max_concurrent?: Int? retries?: { disabled?: { } enabled?: { } } start_method?: { forkserver?: { preload_modules?: [String] } spawn?: { } } tag_concurrency_limits?: [{ key: String limit: Int value?: (String | { applyLimitPerUniqueValue: Bool }) }] }".

Docs:
image

@garethbrickman garethbrickman changed the title It's unclear how/where I should configure job concurrency in dagster OS based on the documentation It's unclear how/where I should configure job concurrency in Dagster OSS based on the documentation Mar 13, 2024
@garethbrickman garethbrickman added the area: concurrency Related to controlling concurrent execution label Mar 13, 2024
@garethbrickman
Copy link
Contributor

@sam-goodwin Could you try restructuring your code as below?

# Configure the multiprocess_executor with the desired settings
configured_multiprocess_executor = multiprocess_executor.configured(
    {
        "max_concurrent": 3,  # limits concurrent processes to 3
        "tag_concurrency_limits": [
            {
                "key": "yarn",
                "value": "dask",
                "limit": 3,
            }
        ],
    }
)

# Define the job with the configured executor
my_job = define_asset_job(
    name="my_job",
    partitions_def=scan_id,
    executor_def=configured_multiprocess_executor,
)

@yuhan yuhan removed the area: docs Related to documentation in general label Apr 2, 2024
@Tazoeur
Copy link

Tazoeur commented Jun 26, 2024

I've faced the same situation and can't get around.

Here's some sample code to reproduce the error, I've tested it with python3.10 and dagster 1.7.10

from time import sleep
from dagster import asset, Definitions, define_asset_job, multiprocess_executor
from pathlib import Path
from contextlib import contextmanager


@contextmanager
def simple_lock():
    lock_file = Path("simple_lock_file")

    if lock_file.exists():
        raise Exception("Already locked!")

    lock_file.touch()

    yield None

    lock_file.unlink()


@asset(tags={"my_key": "my_value"})
def elem_1_step_1(context) -> str:
    with simple_lock():
        sleep(10)

        return "elem_1_step_1"


@asset(tags={"my_key": "my_value"})
def elem_2_step_1(context) -> str:
    with simple_lock():
        sleep(10)

        return "elem_2_step_1"


@asset
def elem_1_step_2(context, elem_1_step_1) -> str:
    return "elem_1_step_2"


@asset
def elem_2_step_2(context, elem_2_step_1) -> str:
    return "elem_2_step_2"


elem_job = define_asset_job(
    "elem_job",
    selection=["elem_1_step_1", "elem_2_step_1", "elem_1_step_2", "elem_2_step_2"],
)

defs = Definitions(
    assets=[elem_1_step_1, elem_1_step_2, elem_2_step_1, elem_2_step_2], jobs=[elem_job]
)

So this will obviously fail because all the "step_1" assets in this job will be launched simultaneously.

I have tested the methods provided above, and neither are working

# not working
configured_multiprocess_executor = multiprocess_executor.configured(
    {
        "tag_concurrency_limits": [
            {
                "key": "my_key",
                "value": "my_value",
                "limit": 1,
            }
        ],
    }
)
elem_job = define_asset_job(
    "elem_job",
    selection=["elem_1_step_1", "elem_2_step_1", "elem_1_step_2", "elem_2_step_2"],
    executor_def=configured_multiprocess_executor,
)
# not working
elem_job = define_asset_job(
    "elem_job",
    selection=["elem_1_step_1", "elem_2_step_1", "elem_1_step_2", "elem_2_step_2"],
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "tag_concurrency_limits": [
                        {
                            "key": "my_key",
                            "value": "my_value",
                            "limit": 1,
                        }
                    ],
                },
            }
        }
    },
)

The only way I was able to get it working successfully was with

elem_job = define_asset_job(
    "elem_job",
    selection=["elem_1_step_1", "elem_2_step_1", "elem_1_step_2", "elem_2_step_2"],
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "max_concurrent": 1,
                },
            }
        }
    },
)

However, it has the disadvantage of not launching the rest of the job assets concurrently.

Does someone have any update on the state of this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: concurrency Related to controlling concurrent execution
Projects
None yet
Development

No branches or pull requests

5 participants