You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
How can I create an scheduler for MultiPartitionKey assets?
Im trying to do
key: define_asset_job(
key + "_job",
selection=AssetSelection.assets(ast),
partitions_def=MultiPartitionsDefinition(ncar_partition_defs[key]),
)
for key, ast in ncar_assets.items()
}
ncar_schedules = {
key: build_schedule_from_partitioned_job(
job,
name=key + "_schedule",
)
for key, job in ncar_jobs.items()
}```
where ast are multi-partitioned assets and jobs are multipartitioned as well.
U047FUX67PD: btw, `ncar_partition_defs[key]` are like
```{
"date": daily_partition_definition,
"fct_step": fct_step_partition_definition,
}```
assets and jobs are loading ok
U028M11QNDD: Hi Airton. How do you want your scheduler to work? Do you want the job to run on daily basis, with a run request for each multi-partition key for the current day?
U047FUX67PD: Just a question, if I want to run a multi-parted job just once, what is the best way to do that?
U047FUX67PD: About the scheduler, I would need the "date" partition to be run every-day, covering all "fct_step" partitions, like a secondary partition
U028M11QNDD: Probably easiest to select the certain partition from the Dagit UI. Alternatively, you could use `job.execute_in_process(partition_key=MultiPartitionKey({"dim_1_name": "dim_1_partition_key", "dim_2_name": "dim_2_partition_key"}))`
U047FUX67PD: I think that could solve my problem, indeed, thanks!
U028M11QNDD: We don't have a built-in way to build a schedule from a multi-partitioned job, but you could define a custom schedule by doing something like this (untested code below):
```composite = MultiPartitionsDefinition(
{
"abc": static_partitions,
"date": time_window_partitions,
}
)
def get_multipartition_keys_with_dimension_value(
partition_def: MultiPartitionsDefinition, dimension_values: Mapping[str, str]
) -> List[str]:
matching_keys = []
for partition_key in partition_def.get_partition_keys():
keys_by_dimension = partition_key.keys_by_dimension()
if all(
[
keys_by_dimension.get(dimension, None) == value
for dimension, value in dimension_values.items()
]
):
matching_keys.append(partition_key)
return matching_keys
@schedule(
cron_schedule=time_window_partitions.get_cron_schedule(),
job=my_job,
execution_timezone=time_window_partitions.timezone,
name="my_schedule",
)
def schedule_def(context):
time_partitions = time_window_partitions.get_partition_keys(context.scheduled_execution_time)
# Run for the latest time partition. Prior partitions will have been handled by prior ticks.
curr_date = time_partitions[-1]
for multipartition_key in get_multipartition_keys_with_dimension_value(
composite, {"date": curr_date}
):
yield my_job.run_request_for_partition(
partition_key=multipartition_key,
run_key=multipartition_key,
)```
U028M11QNDD: I can file a feature request for this, since it would be nice to have this functionality built in
U028M11QNDD: <@U018K0G2Y85> issue build schedule from multi-partitioned job
---
#### Message from the maintainers:
Do you care about this too? Give it a :thumbsup:. We factor engagement into prioritization.
The text was updated successfully, but these errors were encountered:
Issue from the Dagster Slack
This issue was generated from the slack conversation at: https://dagster.slack.com/archives/C01U954MEER/p1669753133630289?thread_ts=1669753133.630289&cid=C01U954MEER
Conversation excerpt
U047FUX67PD: Hi there,
How can I create an scheduler for MultiPartitionKey assets?
Im trying to do
The text was updated successfully, but these errors were encountered: