Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intermittent DagsterInvariantViolationError on widely-partitioned assets #19663

Open
ntellis opened this issue Feb 7, 2024 · 7 comments
Open
Labels
type: bug Something isn't working

Comments

@ntellis
Copy link

ntellis commented Feb 7, 2024

Dagster version

1.5.13

What's the issue?

We started to experience an intermittent error when attempting to materialize partitioned assets (in this case, the asset is named mpc_precovery_asset). The error takes the form:

dagster._core.errors.DagsterInvariantViolationError: __ASSET_JOB_3 has no op named mpc_precovery_asset.

This started to occur when we added a new, monthly partitioned asset with ~100 partitions. After adding this asset, maybe 20-80% of all materialization jobs for all partitioned assets would fail with this same error.

These changes all fixed the error:

  • remove partitioning on mpc_precovery_asset
  • remove another unrelated partitioned asset
  • move the new asset to a different code location

No changes to the asset job definition made any difference, including changing the ins, making it a no-op, etc. The partitioning seemed to be precipitating this error.

Our hypothesis here is that we are overloading something by having too many partitioned assets, or too many too widely partitioned assets in a single code location. We have a handful of assets with ~2600 partitions, 10s of assets with 150-400 partitions.

What did you expect to happen?

We expected to be able to handle an arbitrary (or at least, much larger) number of partitions and partitioned assets in a single code location.

How to reproduce?

We have not created a minimal reproduceable test case, but I would expect to see this issue with partitioned assets similar to ours listed above.

Deployment type

Dagster Helm chart

Deployment details

Using Garden for deployment
Running on GKE
k8s version 1.26.11
We have lowered our default pod spec to request 200mcpu/500mb, this is reflected in user deployment and asset job runner pods
dagster-postgresql migrated from k8s to cloud sql in production (this issue was seen in both environments)

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@ntellis ntellis added the type: bug Something isn't working label Feb 7, 2024
@garethbrickman garethbrickman added the area: partitions Related to Partitions label Feb 7, 2024
@joswhi0
Copy link

joswhi0 commented Feb 7, 2024

I recently started experiencing the same issue with statically partitioned assets (running in version 1.5.5). However, now and then the materialization will start, but then crash with a different error (shown below). I haven't found a pattern for which of the two errors will occur or when. Simply rerunning the same asset and partition in the same deployment may result in either error.

dagster._core.errors.DagsterSubprocessError: During multiprocess execution errors occurred in child processes:
In process 42424: dagster._core.errors.DagsterExecutionStepNotFoundError: Can not build subset plan from unknown step: test__partitioned_asset

Stack Trace:
  File "C:\DevOps\orca\_secondary\orca\env\Lib\site-packages\dagster\_core\executor\child_process_executor.py", line 79, in _execute_command_in_child_process
    for step_event in command.execute():
  File "C:\DevOps\orca\_secondary\orca\env\Lib\site-packages\dagster\_core\executor\multiprocess.py", line 78, in execute
    execution_plan = create_execution_plan(
                     ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\DevOps\orca\_secondary\orca\env\Lib\site-packages\dagster\_core\execution\api.py", line 736, in create_execution_plan
    return ExecutionPlan.build(
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\DevOps\orca\_secondary\orca\env\Lib\site-packages\dagster\_core\execution\plan\plan.py", line 1062, in build
    ).build()
      ^^^^^^^
  File "C:\DevOps\orca\_secondary\orca\env\Lib\site-packages\dagster\_core\execution\plan\plan.py", line 221, in build
    plan = plan.build_subset_plan(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\DevOps\orca\_secondary\orca\env\Lib\site-packages\dagster\_core\execution\plan\plan.py", line 857, in build_subset_plan
    raise DagsterExecutionStepNotFoundError(


  File "C:\DevOps\orca\_secondary\orca\env\Lib\site-packages\dagster\_core\execution\api.py", line 766, in job_execution_iterator
    for event in job_context.executor.execute(job_context, execution_plan):
  File "C:\DevOps\orca\_secondary\orca\env\Lib\site-packages\dagster\_core\executor\multiprocess.py", line 311, in execute
    raise DagsterSubprocessError(

@ntellis
Copy link
Author

ntellis commented Feb 7, 2024

I have seen the exact same error. The one you describe happens fairly consistently on retry of a failing materialization as well.

@OwenKephart
Copy link
Contributor

Hi @ntellis @joswhi0 -- my guess here is that the underlying issue here stems from process that generates "phantom jobs" to allow for ad-hoc materializations. When you build a Definitions object, under the hood, Dagster generates one job for each distinct partitions definition, which serves as the vessel for kicking off ad-hoc requests.

Each of these jobs is named sequentially (i.e. _ASSET_JOB_{N}, and the intention is for them to be created in a stable sorted order, so that different processes generating these jobs always assign the same number to the same logical job.

That seems to be the piece that's going wrong here (rather than any sort of performance issue). My initial guess is that your StaticPartitionsDefinition is being generated something like the following:

my_partitions_1 = {"apple", "bannana", "pear"}
my_partitions_2 = {"artichoke", "broccoli", "peas"}

pd1 = StaticPartitionsDefinition(list(my_partitions_1))
pd2 = StaticPartitionsDefinition(list(my_partitions_2))

Depending on the random ordering of the sets, the repr of either of these partitions definitions may be alphabetically first, leading to a sort that's unstable across processes.

Does this ring a bell at all? I can put up a PR to ensure stable key ordering but I'd like to make sure that's actually the source of the problem before going down that route.

@joswhi0
Copy link

joswhi0 commented Feb 27, 2024

@OwenKephart - thanks for that explanation! I just did a test run where I sorted the partition keys before passing them to StaticPartitionsDefinition and things are running successfully so far. I'll do several more tests with my other partition defs and let you know if the prior errors pop up again, but that looks promising.

@joswhi0
Copy link

joswhi0 commented Feb 28, 2024

It seems like that fixed the problem, so on my end, I'll insert a sorting step whenever I'm generating the list of keys programmatically.

Might a similar problem occur if I add another partition key to the partition definition in between runs, or does dagster generate a wholly separate list of jobs when the partition definition's serial ID changes? It seems like inserting a key could change the _ASSET_JOB_{N} name order. (I haven't run into this problem when adding new partition keys, so I assume something prevents it.)

@OwenKephart
Copy link
Contributor

@joswhi0 In a typical production deployment (i.e. you're using a docker run launcher or similar), any run launched from the UI will be against the latest docker image, so if you launch a run with one ASSET_JOB{X}, then update your code, then launch another job against the same assets, but it's mapped to ASSET_JOB{Y}, then both runs can coexist without issue

@akoumjian
Copy link

@OwenKephart I will test but that does not seem likely in our case. The issue seems to occur by virtue of reaching a threshold in the number of partitions in the user deployment. Note that the problem disappears when we move identical partitioned assets to other user deployments, or add and remove different partitions assets (doesn't matter which) that put the total number over some unknown limit.

It seems much more likely that there is some place that is querying the entire list of phantom jobs or partitioned assets and due to some limit, the reference in question is not where it is expected to be. Perhaps where it is generating the phantom job definitions, there is a sql or graphql query that is hitting some large pagination limit?

@yuhan yuhan removed the area: partitions Related to Partitions label Apr 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

6 participants