Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow specifying PartitionMapping when non-asset jobs depend on partitioned source assets #13940

Open
cleboo opened this issue Apr 27, 2023 · 3 comments

Comments

@cleboo
Copy link

cleboo commented Apr 27, 2023

What's the use case?

I want to use the last partition of an asset as an input to a non-partitioned Job.

This can presently be done using an intermediate asset, however, that way a (potentially big) asset has to be materialized a second time.

Ideas of implementation

My rough idea of what this looks like from the users side would be something like this (using partition mappings):

from dagster import (
    asset,
    op,
    graph,
    DailyPartitionsDefinition,
    OpExecutionContext,
    In,
    AssetIn,
    LastPartitionMapping,
    Definitions
)

partition = DailyPartitionsDefinition(start_date="2023-04-19")

@asset(partitions_def=partition)
def val(context: OpExecutionContext) -> str:
    return context.asset_partition_key_for_output()

@op(ins={'s': In()})
def add_string(context, s):
    """add --added to the string and log it"""
    context.log.info(str(s))
    s = s + "--added"
    context.log.info(s)
    return s

@graph
def generate_val_string():
    return add_string(
        val.to_source_asset(partition_mapping=LastPartitionMapping())
    )


job = generate_date_string.to_job(
    "generate_last_date_string"
)

defs = Definitions(
    jobs=[job, ],
    assets=[val, ]
)

Additional information

Original discussion: #13918

Related to #13357

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@sryza sryza changed the title Let Jobs select the last partition of an asset Allow specifying PartitionMapping when non-asset jobs depend on partitioned source assets Apr 28, 2023
@sryza
Copy link
Contributor

sryza commented Apr 28, 2023

I think the main question here is how exactly the PartitionMapping should be specified. I don't think it makes sense to make it an argument to AssetsDefinition.to_source_asset, because source assets themselves don't have a PartitionMapping. Also, what if you wanted to depend on something that's already a SourceAsset, not an AssetsDefinition?

Maybe something like:

@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01")
def emails():
    ...

@job
def my_job():
    send_emails(OpAssetInput(emails, partition_mapping=LastPartitionMapping()))

@cleboo
Copy link
Author

cleboo commented May 2, 2023

Makes sense to me 🙂 That could also be the manual way to specify the partition mapping for that issue: #13357

@dmsfabiano
Copy link

@sryza @cleboo This would be awesome, would allow us to also dynamically trigger jobs that depend on specific partition via parameters (#16524)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants