Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

All partitions returned when calling .to_source_asset() #17045

Closed
czroth opened this issue Oct 5, 2023 · 2 comments
Closed

All partitions returned when calling .to_source_asset() #17045

czroth opened this issue Oct 5, 2023 · 2 comments
Labels
type: bug Something isn't working

Comments

@czroth
Copy link
Contributor

czroth commented Oct 5, 2023

Dagster version

1.5.0

What's the issue?

I'm following a partitioned asset job with a sensor that kicks of a partitioned job then the asset is materialized using a multi_asset_sensor.

I expect the following call to send_message to be the asset for a particular partition according to the documentation note in https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#loading-an-asset-as-an-input

If the asset is partitioned, then:
If the job is partitioned, the corresponding partition of the asset will be loaded.
If the job is not partitioned, then all partitions of the asset will be loaded. The type that they will be loaded into depends on the I/O manager implementation.

What did you expect to happen?

I expected asset.to_source_asset() in a partitioned job to send just the asset for that partition to the op. Instead, I got a dictionary with info for all partitions.

How to reproduce?

from dagster import (
    AssetKey,
    Definitions,
    RunConfig,
    RunRequest,
    StaticPartitionsDefinition,
    asset,
    define_asset_job,
    graph_asset,
    job,
    multi_asset_sensor,
    op,
)

fruits = StaticPartitionsDefinition(["apple", "banana", "cherry"])


@op
def complicated_step(context):
    context.log.info("Complicated step")
    return "foo"


@op
def get_color(context, complicated_step_output):
    fruit = context.partition_key
    context.log.info(f"Getting color for {fruit} after {complicated_step_output=}")
    if fruit in ["apple", "cherry"]:
        return "red"
    elif fruit == "banana":
        return "yellow"


@graph_asset(partitions_def=fruits)
def fruit_color_asset():
    return get_color(complicated_step())


@op
def send_slack_message(context):
    fruit = context.partition_key
    context.log.info(f"Sending slack message for {fruit=}")


@asset(partitions_def=fruits)
def upstream_asset(context):
    context.log.info(f"Upstream asset for {context.partition_key=}")
    return 1


@op
def add_one(context, input_num):
    context.log.info(f"add_one op for {context.partition_key=}")
    return input_num + 1


@op
def multiply_by_two(context, input_num):
    context.log.info(f"multiply_by_two op for {context.partition_key=}")
    return input_num * 2


@graph_asset(partitions_def=fruits)
def middle_asset(upstream_asset):
    return multiply_by_two(add_one(upstream_asset))


@asset(partitions_def=fruits)
def downstream_asset(context, middle_asset):
    context.log.info(f"Downstream asset for {context.partition_key=}")
    res = f"{middle_asset + 7}, {context.partition_key}"
    context.log.info(res)
    return res


@op
def send_message(context, downstream) -> None:
    # I shouldn't have to do a dictionary call here, but I do
    downstream = downstream[context.partition_key]
    context.log.info(f"Output op for {context.partition_key=}")
    context.log.info(downstream)


all_assets_job = define_asset_job(
    name="stream_assets_job",
    selection=["upstream_asset", "middle_asset", "downstream_asset"],
)


@job(partitions_def=fruits)
def my_job():
    # Expectation is that .to_source_asset() will be called on the correct partition, but it isn't
    send_message(downstream=downstream_asset.to_source_asset())


@multi_asset_sensor(
    monitored_assets=[
        AssetKey("downstream_asset"),
    ],
    job=my_job,
)
def multi_asset_sensor_test(context):
    run_requests = []
    print(dir(context))
    for (
        partition,
        materializations_by_asset,
    ) in context.latest_materialization_records_by_partition_and_asset().items():
        run_requests.append(
            RunRequest(
                partition_key=partition,
                run_config=RunConfig(),
            )
        )
        for asset_key, materialization in materializations_by_asset.items():
            print(asset_key)
            print(materialization)
            if asset_key in context.asset_keys:
                context.advance_cursor({asset_key: materialization})
    return run_requests


defs = Definitions(
    assets=[fruit_color_asset, upstream_asset, middle_asset, downstream_asset],
    jobs=[all_assets_job, my_job],
    sensors=[multi_asset_sensor_test],
)

Deployment type

None

Deployment details

Just using dagster dev after a fresh scaffold.

Additional information

May be related: #12586 and #17038

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@czroth czroth added the type: bug Something isn't working label Oct 5, 2023
@sryza
Copy link
Contributor

sryza commented Oct 5, 2023

Thanks for filing this @czroth. I believe it is a duplicate of #13357. Let us know if you disagree.

@sryza sryza closed this as completed Oct 5, 2023
@czroth
Copy link
Contributor Author

czroth commented Oct 6, 2023

I agree. I'll participate in the other thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants