From b00c4099a7d43e3ccfd574c0c1f4234bac860264 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 7 May 2024 22:33:49 -0700 Subject: [PATCH 1/4] feat(ingest/airflow): refactor sql extractor into two methods --- .../src/datahub_airflow_plugin/_extractors.py | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py index 197ae5298aa83..3f1aeb77b3d4d 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py @@ -78,7 +78,7 @@ def _patch_extractors(self): unittest.mock.patch.object( SnowflakeExtractor, "default_schema", - property(snowflake_default_schema), + property(_snowflake_default_schema), ) ) @@ -166,12 +166,6 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata: task_name = f"{self.operator.dag_id}.{self.operator.task_id}" sql = self.operator.sql - run_facets = {} - job_facets = {"sql": SqlJobFacet(query=self._normalize_sql(sql))} - - # Prepare to run the SQL parser. - graph = self.context.get(_DATAHUB_GRAPH_CONTEXT_KEY, None) - default_database = getattr(self.operator, "database", None) if not default_database: default_database = self.database @@ -185,6 +179,31 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata: # Run the SQL parser. scheme = self.scheme platform = OL_SCHEME_TWEAKS.get(scheme, scheme) + + return _parse_sql_into_task_metadata( + self, + sql, + platform=platform, + default_database=default_database, + default_schema=default_schema, + ) + + +def _parse_sql_into_task_metadata( + self: "BaseExtractor", + sql: str, + platform: str, + default_database: Optional[str], + default_schema: Optional[str], +) -> TaskMetadata: + task_name = f"{self.operator.dag_id}.{self.operator.task_id}" + + run_facets = {} + job_facets = {"sql": SqlJobFacet(query=self._normalize_sql(sql))} + + # Prepare to run the SQL parser. + graph = self.context.get(_DATAHUB_GRAPH_CONTEXT_KEY, None) + self.log.debug( "Running the SQL parser %s (platform=%s, default db=%s, schema=%s): %s", "with graph client" if graph else "in offline mode", @@ -232,7 +251,7 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata: ) -def snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]: +def _snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]: if hasattr(self.operator, "schema") and self.operator.schema is not None: return self.operator.schema return ( From e3f3c20168f69fb24e6efeba240a3f3e13fc76df Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 7 May 2024 22:44:11 -0700 Subject: [PATCH 2/4] support BigQueryInsertJobOperator --- docs/lineage/airflow.md | 3 ++- .../src/datahub_airflow_plugin/_extractors.py | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index d501ea407c072..87a305a9655b9 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -8,7 +8,7 @@ If you're looking to schedule DataHub ingestion using Airflow, see the guide on The DataHub Airflow plugin supports: -- Automatic column-level lineage extraction from various operators e.g. SQL operators (including `MySqlOperator`, `PostgresOperator`, `SnowflakeOperator`, and more), `S3FileTransformOperator`, and more. +- Automatic column-level lineage extraction from various operators e.g. SQL operators (including `MySqlOperator`, `PostgresOperator`, `SnowflakeOperator`, `BigQueryInsertJobOperator`, and more), `S3FileTransformOperator`, and more. - Airflow DAG and tasks, including properties, ownership, and tags. - Task run information, including task successes and failures. - Manual lineage annotations using `inlets` and `outlets` on Airflow operators. @@ -166,6 +166,7 @@ Supported operators: - `SQLExecuteQueryOperator`, including any subclasses. Note that in newer versions of Airflow (generally Airflow 2.5+), most SQL operators inherit from this class. - `AthenaOperator` and `AWSAthenaOperator` - `BigQueryOperator` and `BigQueryExecuteQueryOperator` +- `BigQueryInsertJobOperator` (incubating) - `MySqlOperator` - `PostgresOperator` - `RedshiftSQLOperator` diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py index 3f1aeb77b3d4d..b4a28daa75171 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py @@ -59,6 +59,10 @@ def __init__(self): for operator in _sql_operator_overrides: self.task_to_extractor.extractors[operator] = GenericSqlExtractor + self.task_to_extractor.extractors["BigQueryInsertJobOperator"] = ( + BigQueryInsertJobOperatorExtractor + ) + self._graph: Optional["DataHubGraph"] = None @contextlib.contextmanager @@ -251,6 +255,27 @@ def _parse_sql_into_task_metadata( ) +class BigQueryInsertJobOperatorExtractor(BaseExtractor): + def extract(self) -> Optional[TaskMetadata]: + from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryInsertJobOperator, + ) + + operator: "BigQueryInsertJobOperator" = self.operator + sql = operator.configuration.get("query") + if not sql: + self.log.warning("No query found in BigQueryInsertJobOperator") + return None + + return _parse_sql_into_task_metadata( + self, + sql, + platform="bigquery", + default_database=operator.project_id, + default_schema=None, + ) + + def _snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]: if hasattr(self.operator, "schema") and self.operator.schema is not None: return self.operator.schema From 8b1ee09ab44f61c0762ae28f5f70f272088605d3 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 7 May 2024 22:45:19 -0700 Subject: [PATCH 3/4] tweak --- .../airflow-plugin/src/datahub_airflow_plugin/_extractors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py index b4a28daa75171..39f6aca4ca57e 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py @@ -258,7 +258,7 @@ def _parse_sql_into_task_metadata( class BigQueryInsertJobOperatorExtractor(BaseExtractor): def extract(self) -> Optional[TaskMetadata]: from airflow.providers.google.cloud.operators.bigquery import ( - BigQueryInsertJobOperator, + BigQueryInsertJobOperator, # type: ignore ) operator: "BigQueryInsertJobOperator" = self.operator From 2c4f2bb513b3d6be1ffa2f98fabeabc600183cbb Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 7 May 2024 22:50:26 -0700 Subject: [PATCH 4/4] fix lint --- docs/lineage/airflow.md | 8 ++++++++ .../src/datahub_airflow_plugin/_extractors.py | 6 +++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 87a305a9655b9..f0952309c328a 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -225,6 +225,14 @@ class DbtOperator(BaseOperator): If you override the `pre_execute` and `post_execute` function, ensure they include the `@prepare_lineage` and `@apply_lineage` decorators respectively. Reference the [Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage) for more details. +### Custom Extractors + +Note: these are only supported in the v2 plugin. + +You can also create a custom extractor to extract lineage from any operator. This is useful if you're using a built-in Airflow operator for which we don't support automatic lineage extraction. + +See this [example PR](https://github.com/datahub-project/datahub/pull/10452) which adds a custom extractor for the `BigQueryInsertJobOperator` operator. + ## Emit Lineage Directly If you can't use the plugin or annotate inlets/outlets, you can also emit lineage using the `DatahubEmitterOperator`. diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py index 39f6aca4ca57e..f91c77591d35b 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py @@ -59,9 +59,9 @@ def __init__(self): for operator in _sql_operator_overrides: self.task_to_extractor.extractors[operator] = GenericSqlExtractor - self.task_to_extractor.extractors["BigQueryInsertJobOperator"] = ( - BigQueryInsertJobOperatorExtractor - ) + self.task_to_extractor.extractors[ + "BigQueryInsertJobOperator" + ] = BigQueryInsertJobOperatorExtractor self._graph: Optional["DataHubGraph"] = None