Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream_output.definition_metadata doesn't contain the metadata since 1.7.11 for observable Source Assets [REGRESSION] #22789

Closed
ion-elgreco opened this issue Jul 1, 2024 · 8 comments · Fixed by #22862
Labels
area: asset Related to Software-Defined Assets area: metadata Related to metadata type: bug Something isn't working

Comments

@ion-elgreco
Copy link
Contributor

ion-elgreco commented Jul 1, 2024

Dagster version

1.7.11

What's the issue?

Since v1.7.11 the InputContext.upstream_output.definition_metadata doesn't contain the metadata anymore of the input asset. This is quite problematic since we have IO managers that rely on this metadata of an asset.

with v1.7.11
image
with v1.7.10
image

What did you expect to happen?

No response

How to reproduce?

No response

Deployment type

None

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@ion-elgreco ion-elgreco added the type: bug Something isn't working label Jul 1, 2024
@ion-elgreco ion-elgreco changed the title Upstream_output.definition_metadata doesn't contain the metadata since 1.7.11 [REGRESSION] Upstream_output.definition_metadata doesn't contain the metadata since 1.7.11 for Source Assets [REGRESSION] Jul 1, 2024
@ion-elgreco
Copy link
Contributor Author

@garethbrickman Can someone look into this? This breaks all IO managers implementations for source assets

@garethbrickman garethbrickman added area: asset Related to Software-Defined Assets area: metadata Related to metadata labels Jul 1, 2024
@sverbruggen
Copy link

@garethbrickman Would really appreciate if if you could make some time for this, seems like a relatively major issue.

@OwenKephart
Copy link
Contributor

Hi @sverbruggen @ion-elgreco , are either of you able to create a minimal reproduction of this issue? My attempts haven't been successful here, generally trying things along these lines:

def test_input_manager_with_source_assets() -> None:
    fancy_metadata = {"foo": "bar", "baz": 1.23}

    upstream = SourceAsset("upstream", metadata=fancy_metadata)

    @asset(ins={"upstream": AssetIn(input_manager_key="special_io_manager")})
    def downstream(upstream) -> int:
        return upstream + 1

    class MyIOManager(IOManager):
        def load_input(self, context) -> int:
            assert context.upstream_output is not None
            assert context.upstream_output.asset_key == AssetKey(["upstream"])
            assert context.upstream_output.definition_metadata == fancy_metadata

            return 2

        def handle_output(self, context, obj) -> None: ...

    defs = Definitions(
        assets=[upstream, downstream],
        resources={"special_io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())},
    )
    job = defs.get_implicit_job_def_for_assets([downstream.key])
    assert job is not None
    output = job.execute_in_process()

    assert output._get_output_for_handle("downstream", "result") == 3  # noqa: SLF001
    assert False

There was a change which resulted in those system-generated metadata keys (e.g. dagster/asset_execution_type) no longer appearing, but that wouldn't impact the user-provided metadata as you're experiencing.

@ion-elgreco
Copy link
Contributor Author

@OwenKephart I have an MRE in my code base, will share it when I arrive back home tonight!

@OwenKephart
Copy link
Contributor

Thank you!

@ion-elgreco
Copy link
Contributor Author

ion-elgreco commented Jul 5, 2024

@OwenKephart found a way to reproduce it with a slight modification of your example:

from dagster import (
    AssetIn,
    AssetKey,
    ConfigurableIOManager,  # noqa
    DataVersion,
    IOManager,  # noqa
    asset,
    materialize,
    observable_source_asset,
)


def test_input_manager_with_source_assets() -> None:
    fancy_metadata = {"foo": "bar", "baz": 1.23}

    @observable_source_asset(metadata=fancy_metadata)
    def upstream():
        return DataVersion('1')
    # upstream = SourceAsset("upstream", metadata=fancy_metadata)

    @asset(ins={"upstream": AssetIn(input_manager_key="special_io_manager")})
    def downstream(upstream) -> int:
        return upstream + 1

    class MyIOManager(IOManager):
        def load_input(self, context) -> int:
            assert context.upstream_output is not None
            assert context.upstream_output.asset_key == AssetKey(["upstream"])
            assert context.upstream_output.definition_metadata == fancy_metadata
            return 2

        def handle_output(self, context, obj) -> None: ...

    materialize(assets=[upstream,downstream], resources={"special_io_manager": MyIOManager()})
AssertionError: assert {'dagster/io_... 'io_manager'} == {'baz': 1.23, 'foo': 'bar'}
  Left contains 1 more item:
  {'dagster/io_manager_key': 'io_manager'}
  Right contains 2 more items:
  {'baz': 1.23, 'foo': 'bar'}
  Use -v to get more diff

The key is to use observable source assets, and materialize it instead of a job

@ion-elgreco ion-elgreco changed the title Upstream_output.definition_metadata doesn't contain the metadata since 1.7.11 for Source Assets [REGRESSION] Upstream_output.definition_metadata doesn't contain the metadata since 1.7.11 for observable Source Assets [REGRESSION] Jul 5, 2024
@OwenKephart
Copy link
Contributor

@ion-elgreco thanks for the reproduction, we should be able to get a fix in for next week's release

@ion-elgreco
Copy link
Contributor Author

@OwenKephart thanks!! Much appreciated for the quick fix

danielgafni added a commit to danielgafni/dagster that referenced this issue Jul 9, 2024
remove vestigial T_Handle (dagster-io#22772)

This `TypeVar` is unused in `graph_definition.py` and has no effect in
`op_definition.py`.

[bug] Fix issue where metadata got discarded in SourceAsset conversion (dagster-io#22862)

Resolves: dagster-io#22789

Fixes issue which I believe was introduced by:
dagster-io#22165 (didn't dig that
hard)

The core problem is this conversion:
https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster/_core/definitions/external_asset.py?L176

`observable_source_asset` does not set metadata on the
`OutputDefinition` of its internal op. So when we initially convert a
SourceAsset into an ol' fashioned `AssetsDefinition`, then call
`to_source_assets()` on that, we end up losing the metadata field, as we
were trying to pull it off of the `OutputDefinition`. This PR just pulls
it off of the spec instead.

Added test fails before, passes now.

[record] pickle-ability (dagster-io#22861)

support pickling for records since we need it in some places

added test

[RFC] Dagster Celery Runner (dagster-io#22601)

Hello, I would like to propose a new feature to Dagster celery library
by introducing Dagster Celery Runner. The PR also includes changes
described in this RFC. We have been running this code in production for
a while already and are confident that it is polished enough to be
considered to be included as part of Dagster.

Our workload consists of:
* Multiple assets, each with on average 300 partitions.
* Hourly schedule to materialize these assets (for each partition).
* Each materialization job takes between 10 and 50 seconds.

Using Dagster K8s run launcher along with single process executor, we
have noticed that:
* The workload puts too much load on our K8s cluster.
* Startup times become significant due to the small size of our jobs.
Without overcomplicating the solution, this setup did not seem as the
way to go.

![K8s Runner
drawio](https://github.com/dagster-io/dagster/assets/4254771/8a6805b6-accd-45e3-b8dc-37cec54e8561)

Our goal was to create setup which:
* Supports our workload.
* Minimizes the run launching overhead on our infrastructure.
* Maintains low job startup latency.
* At-least-once execution strategy (runners like k8s give us)
* Ability to scale horizontally.
* Run level isolation.
* Minimize downtime during deployment.

Non-goals:
* Op level isolation

Original
[dagster-celery](https://docs.dagster.io/deployment/guides/celery)
package provides an alternative method of launching runs by dividing op
execution across different workers.

This approach still relies on a runner to start the run, within which
the executor passes the tasks onto individual celery workers. Using it
together with K8s executor would not solve our problem, and combining it
with Default Runner (starts runs on using the code server) would
introduce new scalability and deployment issues (see diagram).

![Celery Executor
drawio](https://github.com/dagster-io/dagster/assets/4254771/01e5e73e-c2af-4b33-a401-435bbb07061a)

While measuring performance of Dagster Celery executor, we did conclude
that it did have the desired run startup latency and potential
scalability.

Our proposed approach introduces a Dagster Celery runner which mimics
the way K8s runner works, but starts the runs directly on Dagster celery
workers (see diagram).

![Celery Runner
drawio](https://github.com/dagster-io/dagster/assets/4254771/3005bbd4-097b-4761-addd-28fb08bd182e)

The main difference in this approach is that instead of launching a
process containing an executor to monitor the steps, the task is
submitted directly to the Celery Broker which in turn finds a worker to
start the execution.

Once a task has been submitted to celery the daemon monitors the process
using standard job monitoring requests which run at a configured
interval. From celery backend the monitoring can read whether a job is
'in progress', 'done' or an alternative state.

Our production setup runs workers on k8s and uses stateless Redis as
both Celery Broker and Backend.

There are three edge cases when using Celery with stateless Broker and
Backend:
1. Redis broker crashes midway before starting the job, causing queue to
drop jobs before they are picked up.
2. Redis backend crashes before the success result is communicated.
3. The worker is forcibly terminated, thus not communicating the result
to the celery backend.

We mitigate case (1) by setting a sensible `start_timeout_seconds` on
for run monitoring. If a job is not started in time, it will be marked
as failed and caught by the retry policy.

Case (2) is handled out of the box as the worker still communicates the
job status to Dagster run storage. In case a job is marked as completed
in Dagster, the monitoring stops.

To handle case (3) we enforce a mandatory `dagster/max_runtime` tag on
all jobs as it is otherwise impossible to distinguish a very slow job
from a terminated one.

Because the workers facilitate job execution, scaling is as easy as
adding more workers. The job monitoring is cheap because it requires a
call to Redis backend from the monitoring daemon.

To make the solution more flexible, we add a run level tag specifying
the celery queue to start the job on, thus allowing different workers to
process different runs (akin to current op level queue tag).

Celery supports warm shutdown, meaning a worker will stop accepting new
tasks once it receives a SIGTERM signal, but will finish existing ones.
We combine this feature with k8s termination grace period and default
rolling update to spin up new workers to pick up the new tasks.

Using celery instead of k8s loses the resource isolation jobs have. We
use default prefork pool along with `max-memory-per-child` and
`max-tasks-per-child` settings to introduce soft limits on worker
processes.

* Created new tests based on the default runner tests.
* Created example docker-compose.

---------

Signed-off-by: Egor Dmitriev <egor.dmitriev@alliander.com>

[asset differ] add "asset removed" change reason (dagster-io#22870)

Allows the asset graph differ to mark an asset as removed in the case
that an asset is present in the base branch but not the comparison
branch.

In the branch deploymenet case this is currently not used, but is useful
for making the asset differ useful in other use-cases (e.g. tracking
change history on a prod branch).

New unit test.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: asset Related to Software-Defined Assets area: metadata Related to metadata type: bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants