Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioned jobs with partitioned source assets as input #13357

Open
ryanmeekins opened this issue Apr 4, 2023 · 5 comments
Open

Partitioned jobs with partitioned source assets as input #13357

ryanmeekins opened this issue Apr 4, 2023 · 5 comments
Labels

Comments

@ryanmeekins
Copy link

What's the use case?

I want to use a partitioned source asset as input to a partitioned job, where each job run would run for a partition and use a partition of an upstream source asset. You can currently define a partitions_def for both a SourceAsset and a @job, however, there isn't a mapping between these so every partition of the SourceAsset is read during job execution.

Ideas of implementation

Ideally, the partition mapping would happen automatically when using the same PartitionsDefinition, like for partitioned asset jobs defined using define_asset_job.

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@sryza
Copy link
Contributor

sryza commented Apr 4, 2023

Thanks for filing this @ryanmeekins . I think we have two options for how to implement this:

  1. Say that, if the PartitionsDefinition on the job is the same as the PartitionsDefinition on the SourceAsset, then read only the partition targeted by the run.
  2. Allow the user to explicitly provide a PartitionMapping between the PartitionsDefinition on the job and the PartitionsDefinition on the SourceAsset,

The advantage of option 1 is that it doesn't require any new API surface area and is simpler for users. The disadvantage of option 1 is that it's some fancy logic under the covers, which users might find unexpected.

@sryza sryza changed the title Partitioned Jobs with Partitioned Source Assets as Input Partitioned jobs with partitioned source assets as input Apr 11, 2023
@dmsfabiano
Copy link

This would be an awesome feature, is this in the near future road map?

@louis-jaris
Copy link

👍 this feature request

Related to this, I am seeing this in the documentation: https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#loading-an-asset-as-an-input

Screenshot 2024-03-29 at 13 11 17

However, the described behaviour is not respected (as of dagster==1.6.13) -- I am still getting my asset being injected in my op as a dict containing all the different partitions of my asset (while my job is partioned, using the same partition_def=).

from marketing_exporter import assets, configurations, partitions

# Below never gets to execute, because `contacts_to_export` is actually a `dict[str, list[dict]]` -- it contains all the partitions
@op
def export_sendgrid_contacts_from_asset(
        context: OpExecutionContext,
        contacts_to_export: list[dict],
        sendgrid_api: SendgridApiResource) -> Output[str]:
    print(f"{contacts_to_export=}")
    # Omitted because it crashes before...


@job(
    partitions_def=partitions.partners_with_marketing
)
def export_to_sendgrid():
    """Export the owners with active pre-offer as SendGrid contacts"""
    export_sendgrid_contacts_from_asset(assets.contacts_to_export.to_source_asset())

@asset_sensor(
    asset_key=assets.contacts_to_export.key,
    job=export_to_sendgrid,
    default_status=DefaultSensorStatus.RUNNING,
)
def observe_for_changes_in_owners_for_marketing(context: SensorEvaluationContext, asset_event: EventLogEntry):
    assert asset_event.dagster_event and asset_event.dagster_event.asset_key
    context.log.info("Detected changes in the asset=%s for partition=%s",
                     asset_event.dagster_event.asset_key, asset_event.dagster_event.partition)
    for partition_key in partitions.partners_with_marketing.get_partition_keys():
        yield RunRequest(
            partition_key=partition_key,
            run_key=context.cursor,
        )

@louis-jaris
Copy link

Had to do a git-blame on the documentation to get more context around the implementation and on the actual meaning of the doc: #12597

From https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#loading-an-asset-as-an-input:

We must use the AssetsDefinition.to_source_asset, because SourceAssets are used to represent assets that other assets or jobs depend on, in settings where they won't be materialized themselves.

If the asset is partitioned, then:

  • If the job is partitioned, the corresponding partition of the asset will be loaded.
  • If the job is not partitioned, then all partitions of the asset will be loaded. The type that they will be loaded into depends on the I/O manager implementation.

But, by reading this #12597 (comment), it is evident that the documentation is misleading and misrepresenting what is happening, @sryza


Today's implementation (loading all the partitions at once at the same time in memory) is not memory friendly for large assets being returned. In other word, if we have a static partitions that is composed ['partner_1', 'partner_2', 'partner_3'], and that every partitions is producing an asset of 2GB, that means that the data pipeline cannot scale horizontally, but it has to scale vertically, which is not always an option...


Any feedback on where this issue is? Do you need help implementing it? It looks like UPathIOManager.load_partitions (code) is the culprit here ? Or maybe it's invocation is the culprit?

@notsyncing
Copy link

Any update on this? It's not always practical to load all asset partitions into memory.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants