Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/airflow): support BigQueryInsertJobOperator #10452

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ If you're looking to schedule DataHub ingestion using Airflow, see the guide on

The DataHub Airflow plugin supports:

- Automatic column-level lineage extraction from various operators e.g. SQL operators (including `MySqlOperator`, `PostgresOperator`, `SnowflakeOperator`, and more), `S3FileTransformOperator`, and more.
- Automatic column-level lineage extraction from various operators e.g. SQL operators (including `MySqlOperator`, `PostgresOperator`, `SnowflakeOperator`, `BigQueryInsertJobOperator`, and more), `S3FileTransformOperator`, and more.
- Airflow DAG and tasks, including properties, ownership, and tags.
- Task run information, including task successes and failures.
- Manual lineage annotations using `inlets` and `outlets` on Airflow operators.
Expand Down Expand Up @@ -166,6 +166,7 @@ Supported operators:
- `SQLExecuteQueryOperator`, including any subclasses. Note that in newer versions of Airflow (generally Airflow 2.5+), most SQL operators inherit from this class.
- `AthenaOperator` and `AWSAthenaOperator`
- `BigQueryOperator` and `BigQueryExecuteQueryOperator`
- `BigQueryInsertJobOperator` (incubating)
- `MySqlOperator`
- `PostgresOperator`
- `RedshiftSQLOperator`
Expand Down Expand Up @@ -224,6 +225,14 @@ class DbtOperator(BaseOperator):

If you override the `pre_execute` and `post_execute` function, ensure they include the `@prepare_lineage` and `@apply_lineage` decorators respectively. Reference the [Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage) for more details.

### Custom Extractors

Note: these are only supported in the v2 plugin.

You can also create a custom extractor to extract lineage from any operator. This is useful if you're using a built-in Airflow operator for which we don't support automatic lineage extraction.

See this [example PR](https://github.com/datahub-project/datahub/pull/10452) which adds a custom extractor for the `BigQueryInsertJobOperator` operator.

## Emit Lineage Directly

If you can't use the plugin or annotate inlets/outlets, you can also emit lineage using the `DatahubEmitterOperator`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def __init__(self):
for operator in _sql_operator_overrides:
self.task_to_extractor.extractors[operator] = GenericSqlExtractor

self.task_to_extractor.extractors[
"BigQueryInsertJobOperator"
] = BigQueryInsertJobOperatorExtractor

self._graph: Optional["DataHubGraph"] = None

@contextlib.contextmanager
Expand All @@ -78,7 +82,7 @@ def _patch_extractors(self):
unittest.mock.patch.object(
SnowflakeExtractor,
"default_schema",
property(snowflake_default_schema),
property(_snowflake_default_schema),
)
)

Expand Down Expand Up @@ -166,12 +170,6 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata:
task_name = f"{self.operator.dag_id}.{self.operator.task_id}"
sql = self.operator.sql

run_facets = {}
job_facets = {"sql": SqlJobFacet(query=self._normalize_sql(sql))}

# Prepare to run the SQL parser.
graph = self.context.get(_DATAHUB_GRAPH_CONTEXT_KEY, None)

default_database = getattr(self.operator, "database", None)
if not default_database:
default_database = self.database
Expand All @@ -185,6 +183,31 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata:
# Run the SQL parser.
scheme = self.scheme
platform = OL_SCHEME_TWEAKS.get(scheme, scheme)

return _parse_sql_into_task_metadata(
self,
sql,
platform=platform,
default_database=default_database,
default_schema=default_schema,
)


def _parse_sql_into_task_metadata(
self: "BaseExtractor",
sql: str,
platform: str,
default_database: Optional[str],
default_schema: Optional[str],
) -> TaskMetadata:
task_name = f"{self.operator.dag_id}.{self.operator.task_id}"

run_facets = {}
job_facets = {"sql": SqlJobFacet(query=self._normalize_sql(sql))}

# Prepare to run the SQL parser.
graph = self.context.get(_DATAHUB_GRAPH_CONTEXT_KEY, None)

self.log.debug(
"Running the SQL parser %s (platform=%s, default db=%s, schema=%s): %s",
"with graph client" if graph else "in offline mode",
Expand Down Expand Up @@ -232,7 +255,28 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata:
)


def snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]:
class BigQueryInsertJobOperatorExtractor(BaseExtractor):
def extract(self) -> Optional[TaskMetadata]:
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator, # type: ignore
)

operator: "BigQueryInsertJobOperator" = self.operator
sql = operator.configuration.get("query")
if not sql:
self.log.warning("No query found in BigQueryInsertJobOperator")
return None

return _parse_sql_into_task_metadata(
self,
sql,
platform="bigquery",
default_database=operator.project_id,
default_schema=None,
)


def _snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]:
if hasattr(self.operator, "schema") and self.operator.schema is not None:
return self.operator.schema
return (
Expand Down