-
Notifications
You must be signed in to change notification settings - Fork 16.3k
automatically inject OL info into Glue job in GlueJobOperator #59094
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
1dc055a to
ebe1f66
Compare
kacpermuda
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably need some adjustment, left my opinion
| parent_job_namespace=lineage_job_namespace(), | ||
| parent_job_name=lineage_job_name(ti), # type: ignore[arg-type] | ||
| parent_run_id=lineage_run_id(ti), # type: ignore[arg-type] | ||
| root_parent_job_namespace=lineage_job_namespace(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use lineage_root_job_namespace here. Must have missed that when adding that macro.
| class ParentJobInformation(NamedTuple): | ||
| """Container for OpenLineage parent job information.""" | ||
|
|
||
| parent_job_namespace: str | ||
| parent_job_name: str | ||
| parent_run_id: str | ||
| root_parent_job_namespace: str | ||
| root_parent_job_name: str | ||
| root_parent_run_id: str | ||
|
|
||
|
|
||
| def get_parent_job_information(context: Context) -> ParentJobInformation | None: | ||
| """ | ||
| Retrieve parent job information from the Airflow context. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably not live in spark utils, as it's unrelated to spark. But since we do not have the "public api" on the provider yet, I think it's as good as any other place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I wanted to not proliferate this code over multiple files - once we expand beyond Spark it would make sense to have some inject.py file and use it.
| The parent job information is injected via the --customer-driver-env-vars argument, | ||
| which sets environment variables in the Spark driver process. | ||
| - If OpenLineage provider is not available, skip injection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The OL provider can be disabled and this will still inject I think. The compat can return valid info in that case. Maybe some additional check for that like I did here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
airflow.providers.common.compat.openlineage.utils.spark.get_parent_job_information will return None in that case, which causes this function to return the original dict.
| return ",".join(parts) | ||
|
|
||
|
|
||
| def _is_parent_job_info_present_in_glue_env_vars(script_args: dict[str, Any]) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this actually being used anywhere? There is no test that actually checks if the OL variables are not overwritten.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, changed concept in the meantime... removed this one.
| for char in env_vars_string: | ||
| if char == '"' and (not current or current[-1] != "\\"): | ||
| in_quotes = not in_quotes | ||
| current += char | ||
| elif char == "," and not in_quotes: | ||
| if "=" in current: | ||
| key, value = current.split("=", 1) | ||
| # Strip surrounding quotes if present | ||
| value = value.strip() | ||
| if value.startswith('"') and value.endswith('"'): | ||
| value = value[1:-1] | ||
| result[key.strip()] = value | ||
| current = "" | ||
| else: | ||
| current += char | ||
|
|
||
| # Handle last element | ||
| if current and "=" in current: | ||
| key, value = current.split("=", 1) | ||
| value = value.strip() | ||
| if value.startswith('"') and value.endswith('"'): | ||
| value = value[1:-1] | ||
| result[key.strip()] = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a lot of quoting and other edge cases here, but tests are not really clear on how they are handled (or maybe it's not clear to me). Looking at this I'm not sure that we handle them all correctly/ Can you add more description to the docstring here and/or tests?
| existing_env_vars_str = script_args.get("--customer-driver-env-vars", "") | ||
| existing_env_vars = _parse_glue_customer_env_vars(existing_env_vars_str) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need such complex parsing and then formatting? Can't we just check if there are any "OPENLINEAGE_PARENT/ROOT" substring and if not simply append to the end of the env vars string? I feel like we risk a lot less than when we're parsing and then formatting customer vars, and the result is the same.
| ) | ||
|
|
||
|
|
||
| def inject_parent_job_information_into_glue_script_args( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should wrap it all in try/except (e.g., how I did here) just to make sure if any unexpected failure happens we handle it gracefully, return unmodified params and log a warning for the user.
This PR provides feature where GlueJobOperator automatically injects OpenLIneage parent job information into the Spark job properties.
This is analogous to previous PRs adding this functionality to many other Spark related operators, like #44477 - more details about this feature are available there.