diff --git a/airflow-core/src/airflow/serialization/definitions/param.py b/airflow-core/src/airflow/serialization/definitions/param.py
index 12470c5fdd25f..ff459020a357c 100644
--- a/airflow-core/src/airflow/serialization/definitions/param.py
+++ b/airflow-core/src/airflow/serialization/definitions/param.py
@@ -20,6 +20,7 @@
import collections.abc
import copy
+import re
from typing import TYPE_CHECKING, Any, Literal
from airflow.serialization.definitions.notset import NOTSET, is_arg_set
@@ -27,6 +28,36 @@
if TYPE_CHECKING:
from collections.abc import Iterator, Mapping
+# Matches ISO 8601 duration strings such as PT15M, P1Y2M3DT4H5M6S, P1W, P1DT30S.
+# Decimal fractions with either "." or "," are allowed on any component (e.g. PT1.5H, PT30.5S).
+_ISO8601_DURATION_RE = re.compile(
+ r"^P"
+ r"(?:\d+(?:[.,]\d+)?Y)?"
+ r"(?:\d+(?:[.,]\d+)?M)?"
+ r"(?:\d+(?:[.,]\d+)?W)?"
+ r"(?:\d+(?:[.,]\d+)?D)?"
+ r"(?:T(?:\d+(?:[.,]\d+)?H)?(?:\d+(?:[.,]\d+)?M)?(?:\d+(?:[.,]\d+)?S)?)?"
+ r"$"
+)
+
+
+def _check_iso8601_duration(instance: str) -> bool:
+ """Validate an ISO 8601 duration string. Used as a jsonschema format checker."""
+ if not isinstance(instance, str):
+ return True
+ if not _ISO8601_DURATION_RE.fullmatch(instance) or instance in ("P", "PT"):
+ raise ValueError(f"{instance!r} is not a valid ISO 8601 duration")
+ return True
+
+
+def _make_format_checker() -> Any:
+ """Return a FormatChecker that includes our ISO 8601 duration format checker."""
+ import jsonschema
+
+ checker = jsonschema.FormatChecker()
+ checker.checks("duration", raises=ValueError)(_check_iso8601_duration)
+ return checker
+
class SerializedParam:
"""Server-side param class for deserialization."""
@@ -60,7 +91,7 @@ def resolve(self, *, raises: bool = False) -> Any:
try:
if not is_arg_set(value := self.value):
raise ValueError("No value passed")
- jsonschema.validate(value, self.schema, format_checker=jsonschema.FormatChecker())
+ jsonschema.validate(value, self.schema, format_checker=_make_format_checker())
except Exception:
if not raises:
return None
diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/components.json
index 67e348ca11e35..c220682cae5c1 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/components.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/components.json
@@ -71,6 +71,7 @@
"files_other": "{{count}} files"
},
"flexibleForm": {
+ "durationPlaceholder": "e.g. PT15M",
"placeholder": "Select Value",
"placeholderArray": "Enter each string on a new line",
"placeholderExamples": "Start typing to see options",
diff --git a/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldDuration.tsx b/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldDuration.tsx
new file mode 100644
index 0000000000000..4d56b76e5a6a0
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldDuration.tsx
@@ -0,0 +1,54 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Input } from "@chakra-ui/react";
+import { useTranslation } from "react-i18next";
+
+import { paramPlaceholder, useParamStore } from "src/queries/useParamStore";
+
+import type { FlexibleFormElementProps } from ".";
+
+export const FieldDuration = ({ name, namespace = "default", onUpdate }: FlexibleFormElementProps) => {
+ const { t: translate } = useTranslation("components");
+ const { disabled, paramsDict, setParamsDict } = useParamStore(namespace);
+ const param = paramsDict[name] ?? paramPlaceholder;
+ const handleChange = (value: string) => {
+ if (paramsDict[name]) {
+ // "undefined" values are removed from params, so we set it to null to avoid falling back to DAG defaults.
+ // eslint-disable-next-line unicorn/no-null
+ paramsDict[name].value = value === "" ? null : value;
+ }
+
+ setParamsDict(paramsDict);
+ onUpdate(value);
+ };
+
+ return (
+ {
+ handleChange(event.target.value);
+ }}
+ placeholder={translate("flexibleForm.durationPlaceholder")}
+ size="sm"
+ value={(param.value ?? "") as string}
+ />
+ );
+};
diff --git a/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldSelector.tsx b/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldSelector.tsx
index 716b7c48be051..0ff519b6dfd43 100644
--- a/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldSelector.tsx
+++ b/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldSelector.tsx
@@ -24,6 +24,7 @@ import { FieldAdvancedArray } from "./FieldAdvancedArray";
import { FieldBool } from "./FieldBool";
import { FieldDateTime } from "./FieldDateTime";
import { FieldDropdown } from "./FieldDropdown";
+import { FieldDuration } from "./FieldDuration";
import { FieldMultiSelect } from "./FieldMultiSelect";
import { FieldMultilineText } from "./FieldMultilineText";
import { FieldNumber } from "./FieldNumber";
@@ -71,6 +72,9 @@ const enumTypes = ["null", "string", "number", "integer"];
const isFieldDropdown = (fieldType: string, fieldSchema: ParamSchema) =>
enumTypes.includes(fieldType) && Array.isArray(fieldSchema.enum);
+const isFieldDuration = (fieldType: string, fieldSchema: ParamSchema) =>
+ fieldType === "string" && fieldSchema.format === "duration";
+
const isFieldMultilineText = (fieldType: string, fieldSchema: ParamSchema) =>
fieldType === "string" && fieldSchema.format === "multiline";
@@ -127,6 +131,8 @@ export const FieldSelector = ({ name, namespace = "default", onUpdate }: Flexibl
return ;
} else if (isFieldNumber(fieldType)) {
return ;
+ } else if (isFieldDuration(fieldType, param.schema)) {
+ return ;
} else if (isFieldMultilineText(fieldType, param.schema)) {
return ;
} else {
diff --git a/task-sdk/src/airflow/sdk/definitions/param.py b/task-sdk/src/airflow/sdk/definitions/param.py
index d1174ec561bad..debe1a59a4b8a 100644
--- a/task-sdk/src/airflow/sdk/definitions/param.py
+++ b/task-sdk/src/airflow/sdk/definitions/param.py
@@ -20,6 +20,7 @@
import copy
import json
import logging
+import re
from collections.abc import ItemsView, Iterable, Mapping, MutableMapping, ValuesView
from typing import TYPE_CHECKING, Any, ClassVar, Literal
@@ -34,6 +35,36 @@
logger = logging.getLogger(__name__)
+# Matches ISO 8601 duration strings such as PT15M, P1Y2M3DT4H5M6S, P1W, P1DT30S.
+# Decimal fractions with either "." or "," are allowed on any component (e.g. PT1.5H, PT30.5S).
+_ISO8601_DURATION_RE = re.compile(
+ r"^P"
+ r"(?:\d+(?:[.,]\d+)?Y)?"
+ r"(?:\d+(?:[.,]\d+)?M)?"
+ r"(?:\d+(?:[.,]\d+)?W)?"
+ r"(?:\d+(?:[.,]\d+)?D)?"
+ r"(?:T(?:\d+(?:[.,]\d+)?H)?(?:\d+(?:[.,]\d+)?M)?(?:\d+(?:[.,]\d+)?S)?)?"
+ r"$"
+)
+
+
+def _check_iso8601_duration(instance: str) -> bool:
+ """Validate an ISO 8601 duration string. Used as a jsonschema format checker."""
+ if not isinstance(instance, str):
+ return True
+ if not _ISO8601_DURATION_RE.fullmatch(instance) or instance in ("P", "PT"):
+ raise ValueError(f"{instance!r} is not a valid ISO 8601 duration")
+ return True
+
+
+def _make_format_checker() -> Any:
+ """Return a FormatChecker that includes our ISO 8601 duration format checker."""
+ from jsonschema import FormatChecker
+
+ checker = FormatChecker()
+ checker.checks("duration", raises=ValueError)(_check_iso8601_duration)
+ return checker
+
class Param:
"""
@@ -96,7 +127,6 @@ def resolve(self, value: Any = NOTSET, suppress_exception: bool = False) -> Any:
If true and validations fails, the return value would be None.
"""
import jsonschema
- from jsonschema import FormatChecker
from jsonschema.exceptions import ValidationError
if value is not NOTSET:
@@ -107,7 +137,7 @@ def resolve(self, value: Any = NOTSET, suppress_exception: bool = False) -> Any:
return None
raise ParamValidationError("No value passed and Param has no default value")
try:
- jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+ jsonschema.validate(final_val, self.schema, format_checker=_make_format_checker())
except ValidationError as err:
if suppress_exception:
return None
diff --git a/task-sdk/tests/task_sdk/definitions/test_param.py b/task-sdk/tests/task_sdk/definitions/test_param.py
index 887003d13dc0a..0dc18eb470a85 100644
--- a/task-sdk/tests/task_sdk/definitions/test_param.py
+++ b/task-sdk/tests/task_sdk/definitions/test_param.py
@@ -138,6 +138,41 @@ def test_string_date_format_error(self, date_string):
with pytest.raises(ParamValidationError, match="is not a 'date'"):
Param(date_string, type="string", format="date").resolve()
+ @pytest.mark.parametrize(
+ "duration",
+ [
+ pytest.param("PT15M", id="minutes-only"),
+ pytest.param("P1Y", id="years-only"),
+ pytest.param("P1W", id="weeks-only"),
+ pytest.param("P1D", id="days-only"),
+ pytest.param("PT1H", id="hours-only"),
+ pytest.param("PT30S", id="seconds-only"),
+ pytest.param("P1DT2H", id="days-and-hours"),
+ pytest.param("P1Y2M3DT4H5M6S", id="full-duration"),
+ pytest.param("PT1.5H", id="fractional-hours-dot"),
+ pytest.param("PT1,5H", id="fractional-hours-comma"),
+ ],
+ )
+ def test_string_duration_format(self, duration):
+ """Test valid ISO 8601 duration strings."""
+ assert Param(duration, type="string", format="duration").resolve() == duration
+
+ @pytest.mark.parametrize(
+ "duration",
+ [
+ pytest.param("P", id="bare-P"),
+ pytest.param("PT", id="bare-PT"),
+ pytest.param("invalid", id="plain-text"),
+ pytest.param("15M", id="missing-P-prefix"),
+ pytest.param("P-1Y", id="negative-value"),
+ pytest.param("1Y2M", id="no-P-prefix"),
+ ],
+ )
+ def test_string_duration_format_error(self, duration):
+ """Test invalid ISO 8601 duration strings."""
+ with pytest.raises(ParamValidationError, match="is not a 'duration'"):
+ Param(duration, type="string", format="duration").resolve()
+
def test_int_param(self):
p = Param(5)
assert p.resolve() == 5