Add Databricks query tags for DatabricksSqlOperator and DatabricksCopyIntoOperator#66886
Add Databricks query tags for DatabricksSqlOperator and DatabricksCopyIntoOperator#66886Shaan-alpha wants to merge 4 commits into
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
|
Refactored query-tag injection into shared helper utilities to reduce duplicated logic and improve maintainability across operators. |
aaacdcb to
e072ad1
Compare
SameerMesiah97
left a comment
There was a problem hiding this comment.
I have left a few comments.
| if "run_id" in context and context["run_id"]: | ||
| tags.append(f"airflow_run_id:{_escape_query_tag_value(context['run_id'])}") | ||
|
|
||
| return ",".join(tags) |
There was a problem hiding this comment.
I think this could be made more clear and explicit via a mapping driven approach. Please see the below for guidance:
_QUERY_TAG_FIELDS = {
"airflow_dag_id": ("dag", "dag_id"),
"airflow_task_id": ("task", "task_id"),
"airflow_run_id": ("run_id", None),
}
def _format_query_tags(context: Context) -> str:
tags = []
for tag_name, (context_key, attr) in _QUERY_TAG_FIELDS.items():
value = context.get(context_key)
if attr:
value = getattr(value, attr, None)
if value:
tags.append(f"{tag_name}:{_escape_query_tag_value(value)}")
return ",".join(tags)
Also, you could do the same for _escape_query_tag_value:
_QUERY_TAG_ESCAPE_SEQUENCES = {
"\\": "\\\\",
",": "\\,",
":": "\\:",
}
def _escape_query_tag_value(value: str) -> str:
"""Escape Databricks query-tag separator characters in a tag value."""
escaped = str(value)
for char, replacement in _QUERY_TAG_ESCAPE_SEQUENCES.items():
escaped = escaped.replace(char, replacement)
return escaped
There was a problem hiding this comment.
Thanks for the suggestion @SameerMesiah97! I agree the mapping-driven approach is cleaner and more maintainable. I've gone ahead and refactored both _format_query_tags and _escape_query_tag_value to use the _QUERY_TAG_FIELDS and _QUERY_TAG_ESCAPE_SEQUENCES mappings exactly as you suggested — this is included in commit 732df03 ("Refactor Databricks query tag helper utilities"). Please take another look and let me know if you'd like any further tweaks.
|
|
||
| def execute(self, context: Context) -> Any: | ||
| _inject_query_tags(self.get_db_hook(), context) | ||
| return super().execute(context) |
There was a problem hiding this comment.
Do we want this behavior configurable (operator/provider-level opt-out)? Since this mutates session_configuration automatically, some users may prefer explicit control over injected warehouse metadata.
There was a problem hiding this comment.
Good point — making this configurable makes sense so users retain explicit control over session_configuration. I'll add an inject_query_tags: bool = True parameter to both DatabricksSqlOperator and DatabricksCopyIntoOperator (defaulting to True to preserve the observability benefit, while allowing easy opt-out). I'll also document the new parameter in the operator docstrings. Will push the update shortly.
| } | ||
| assert result.job_facets == {"sql": SQLJobFacet(query=op._sql)} | ||
|
|
||
|
|
There was a problem hiding this comment.
I think this test should be moved above the openlineage tests as it is closer to core execution logic. Also, they only cover the happy-path. Could we add coverage for empty/partial context, empty existing query_tags, and values containing commas/colons/backslashes?
The escaping path is probably the most important one here since _escape_query_tag_value() is not really exercised end-to-end right now. It would also be good to verify that unrelated session_configuration values are preserved after the merge.
There was a problem hiding this comment.
Thanks, that ordering makes sense. I'll move test_query_tags_injection above the OpenLineage tests so it sits closer to the core execution logic. I'll also expand coverage to include: empty/partial context (missing dag/task/run_id), empty existing query_tags, values containing commas/colons/backslashes (to exercise _escape_query_tag_value end-to-end), and an assertion that unrelated keys in session_configuration are preserved after the merge. Will include in the next push.
| assert bucket == "my-bucket" | ||
| assert object_name == "path/to/file.parquet" | ||
|
|
||
|
|
There was a problem hiding this comment.
I would move this test after test_exec_write_gcs_parquet_output. Also, the above comment regarding incomplete test coverage applies here too.
There was a problem hiding this comment.
Will do — I'll move the new test to sit right after test_exec_write_gcs_parquet_output and mirror the same expanded coverage (empty/partial context, empty existing query_tags, escape-character values, and preservation of unrelated session_configuration keys). Thanks for the review!
Address review feedback on apache#66886: - Add `inject_query_tags: bool = True` parameter to DatabricksSqlOperator and DatabricksCopyIntoOperator, allowing users to opt out of the automatic session_configuration mutation while preserving the default observability benefit. Documented in both operator docstrings. - Move query-tag tests above the OpenLineage block (test_databricks_copy) and after test_exec_write_gcs_parquet_output (test_databricks_sql) so they sit closer to core execution logic. - Expand coverage in both test files: empty/partial context, empty existing query_tags, end-to-end exercise of escape sequences for commas/colons/backslashes, preservation of unrelated session_configuration keys, fallback to connection extras when session_config is None, and verification of the opt-out path.
|
@SameerMesiah97 thanks again for the review — pushed
Comment 1 (mapping-driven helpers) was already addressed in |
Let's wait for a maintainer to trigger CI. |
38d58f8 to
8768dca
Compare
Address review feedback on apache#66886: - Add `inject_query_tags: bool = True` parameter to DatabricksSqlOperator and DatabricksCopyIntoOperator, allowing users to opt out of the automatic session_configuration mutation while preserving the default observability benefit. Documented in both operator docstrings. - Move query-tag tests above the OpenLineage block (test_databricks_copy) and after test_exec_write_gcs_parquet_output (test_databricks_sql) so they sit closer to core execution logic. - Expand coverage in both test files: empty/partial context, empty existing query_tags, end-to-end exercise of escape sequences for commas/colons/backslashes, preservation of unrelated session_configuration keys, fallback to connection extras when session_config is None, and verification of the opt-out path.
|
Follow-up improvements planned based on review and maintainability considerations:
This PR intentionally preserves backward compatibility by:
|
…QL operators This PR injects the Airflow dag_id, task_id, and run_id into the session_configuration parameter of DatabricksSqlOperator and DatabricksCopyIntoOperator. This enhances observability on the Databricks side. The tags are safely escaped and user-defined query tags are preserved. Closes apache#66839
Refactor query tag formatting and escaping into mapping-driven helper utilities for maintainability and extensibility.
Address review feedback on apache#66886: - Add `inject_query_tags: bool = True` parameter to DatabricksSqlOperator and DatabricksCopyIntoOperator, allowing users to opt out of the automatic session_configuration mutation while preserving the default observability benefit. Documented in both operator docstrings. - Move query-tag tests above the OpenLineage block (test_databricks_copy) and after test_exec_write_gcs_parquet_output (test_databricks_sql) so they sit closer to core execution logic. - Expand coverage in both test files: empty/partial context, empty existing query_tags, end-to-end exercise of escape sequences for commas/colons/backslashes, preservation of unrelated session_configuration keys, fallback to connection extras when session_config is None, and verification of the opt-out path.
d39b2fd to
d425a49
Compare
Description
This PR addresses issue #66839 by injecting Airflow context metadata (dag_id, ask_id, and
un_id) into the session_configuration parameter as query_tags during the execution of DatabricksSqlOperator and DatabricksCopyIntoOperator.
By automatically populating query tags, it significantly enhances observability on the Databricks side, allowing administrators and developers to trace specific queries executed in Databricks SQL directly back to the Airflow task run that spawned them.
Key Changes:
Related Issues
Testing