Skip to content

Commit

Permalink
validate tags in partition sets (to turn dicts into strings) (#7554)
Browse files Browse the repository at this point in the history
  • Loading branch information
gibsondan committed Apr 25, 2022
1 parent 7452ec3 commit a141905
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
6 changes: 4 additions & 2 deletions python_modules/dagster/dagster/core/definitions/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
ScheduleDefinition,
ScheduleEvaluationContext,
)
from .utils import check_valid_name
from .utils import check_valid_name, validate_tags

DEFAULT_DATE_FORMAT = "%Y-%m-%d"

Expand Down Expand Up @@ -495,7 +495,9 @@ def run_config_for_partition(self, partition: Partition[T]) -> Dict[str, Any]:
return copy.deepcopy(self._user_defined_run_config_fn_for_partition(partition))

def tags_for_partition(self, partition: Partition[T]) -> Dict[str, str]:
user_tags = copy.deepcopy(self._user_defined_tags_fn_for_partition(partition))
user_tags = copy.deepcopy(
validate_tags(self._user_defined_tags_fn_for_partition(partition))
)
check_tags(user_tags, "user_tags")

tags = merge_dicts(user_tags, PipelineRun.tags_for_partition_set(self, partition))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json

import pytest

from dagster import (
Expand Down Expand Up @@ -104,3 +106,27 @@ def my_job():
DagsterUnknownPartitionError, match="Could not find a partition with key `doesnotexist`"
):
result = my_job.execute_in_process(partition_key="doesnotexist")


def test_dict_partitioned_config_tags():
def partition_fn(_current_time=None):
return ["blah"]

@dynamic_partitioned_config(
partition_fn, tags_for_partition_fn=lambda partition_key: {"foo": {"bar": partition_key}}
)
def my_dynamic_partitioned_config(_partition_key):
return RUN_CONFIG

assert my_dynamic_partitioned_config("") == RUN_CONFIG

@job(config=my_dynamic_partitioned_config)
def my_job():
my_op()

partition_keys = my_dynamic_partitioned_config.get_partition_keys()
assert partition_keys == ["blah"]

result = my_job.execute_in_process(partition_key="blah")
assert result.success
assert result.dagster_run.tags["foo"] == json.dumps({"bar": "blah"})

0 comments on commit a141905

Please sign in to comment.