Skip to content

Commit

Permalink
Add Source Code and Testing for Google Cloud's Data Pipelines Create …
Browse files Browse the repository at this point in the history
…Operator (#32843)

* CreateDataPipeline operator and hook created.

* Added DAG for datapipeline to test if running as hoped

* created the run pipeline hook, updated the data pipeline create hook and operator to properly intake the body

* updated the create pipeline to accept the body. created run pipeline hook. updated operator accordingly

* Add RunDataPipelineOperator

* Example dags is working for creating a new pipeline. Committing minor chagnes before working on run operator/ hook exanple dag and testing

* return full response body update

* The Create and Run operators are meant to work together, so it was recommended that the run operator accept the entire response body.
Updated the Run Operator to accept the response body dict and removed the other parameters because they were used to build the pipeline name, which the response body includes.
Created variable for the pipeline name that takes it from the given response body. Used it to call upon the hook.
Updated the hook to accept the pipeline name and removed the building of the name.
Added run_data_pipeline to example dags to call the Run Data Pipeline operator passing a pipeline name taken from the create_data_pipeline result. (which should be a response dict).

* Update variable names to be consistent

* Update the Run Operator to accept  the Pipeline name instead of the response body.

* Update example dags to reflect that only the name is being returned from the Create Operator

* Create Operator returns full response body. example dag reflects that

* fixed syntax errors. gave run example dag a task id and pipeline name for testing

* Added project_id and location to run pipeline hook.
Updated the dags to work with dataflow-interns. Running properly.
Added project_id and location to run operator.

* add testing file

* Change created pipeline source to airflow

* Fix source format and remove whitespace

* Raise exception if data_pipeline_name not given

* Removed unnecessary variable data_pipeline_name from operator and all placed referenced for the create.
Added code comments to explain parameters for operators and hooks.
Updated run hook to utilize all variables.
Removed some whitespace.

* Updated code comments

* Updated comments to include that the return values

* Create hooks test file, rename operator test file

* Updating Branch

* Change created pipeline source to airflow

* Fix source format and remove whitespace

* Create hooks test file, rename operator test file

* Change created pipeline source to airflow

* Fix source format and remove whitespace

* Raise exception if data_pipeline_name not given

* Create hooks test file, rename operator test file

* Change created pipeline source to airflow

* Fix source format and remove whitespace

* Change created pipeline source to airflow

* Fix source format and remove whitespace

* Raise exception if data_pipeline_name not given

* Create hooks test file, rename operator test file

* Add Airflow Exceptions for run operator

* Test variables and operator used for testing create op. Some imports that I  believe will be necessary for testing create op

* Change created pipeline source to airflow

* Raise exception if data_pipeline_name not given

* Create hooks test file, rename operator test file

* Add Airflow Exceptions for run operator

* Raise exception if data_pipeline_name not given

* Create hooks test file, rename operator test file

* Raise exception if data_pipeline_name not given

* Create hooks test file, rename operator test file

* Merge Conflicts resolved

* Added Airflow exception for if body parameter is not given. Cannot create the pipeline without it.

* Fixed indentation

* address PR comments

* accidentally deleted the service access line

* Addresses PR comment

* Re: xianhualiu; Raise exception if pipeline run error

* Passing Test Cases if the Create DataPipeline Operator is given the correct input

* Passing test for if projectid param is not given.

* Passing tests that check that the operator is passed the body and location.

* Create test for run operator execute

* Accidentally added to wrong branch, undoing

* undoing

* Passing test that if the response body contains an error an Airflow Exception is raised

* Re: xianhualiu; Raise exception if pipeline run error

* Create test for run operator execute

* undoing

* Removed the Run Operator test because everything was not pushed

* Create tests for run operator

* Remove import; using mock for hook

* Change test names to be consistent with create operator tests

* Commit for host to review. Renders an error that gcp_conn_id doesn't exist bc it is not mocking properly and that .create is not called in reference to line 127. these should only get feedback and not be approved to merge

* .

* minor fix to params in mock request

* Create hooks tests for running pipeline

* Update grammar

* Remove comment

* update

* Update

* Revert "Add tests for run operator"

* Minor removals based on Manav comments

* Revert "Revert "Add tests for run operator""

* Test hook for running data pipeline

* remove comment

* test_create_data_pipeline was failing because "create()"
was not being called. this was because I used the wrong test_parent and my project id wasn't the same all around. fixed this and removed unnecessary patch. now focusing on mock gcp conn issue.

* Assert that run_data_pipeline call returns job

* completed code. cleaned up unnecessary imports

* removed an overlooked conflict from when I merged the run test

* formatting all files created for both operators and hooks

* Merging the Create Operator code to Apache

* prepare for sync

* delete dags for dataflow

* Altered DAG to use variables instead of hard coded values.

* renamed test to better suit their function

* Create and start documentation for operators

* Added documentation draft for create operator.

* Adding Data Pipelines API REST Resource to the documentation as a reference.

* Move DAGs file to the system test DataPipelines folder

* updated DAGs to create a local bucket and pull from that instead of the external resource. Added "resources" folder.

* added impersonation chain & updated comments

* static check fixes

* adding license

* adding resource files.
add None check for the data_pipeline

* updated unit test to account for impersonation chain

* Fixed failing pre-commits

* updated documentation to pull from the correct file location

* increase underline length

---------

Co-authored-by: shaniyaclement <shaniya.clement17@gmail.com>
Co-authored-by: Brenda Pham <bloop@google.com>
Co-authored-by: Brenda Pham <93027753+blpham@users.noreply.github.com>
  • Loading branch information
4 people committed Aug 14, 2023
1 parent 8bf53dd commit a2a0d05
Show file tree
Hide file tree
Showing 11 changed files with 4,968 additions and 0 deletions.
89 changes: 89 additions & 0 deletions airflow/providers/google/cloud/hooks/datapipeline.py
@@ -0,0 +1,89 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains a Google Data Pipelines Hook."""
from __future__ import annotations

from typing import Sequence

from googleapiclient.discovery import build

from airflow.providers.google.common.hooks.base_google import (
GoogleBaseHook,
)

DEFAULT_DATAPIPELINE_LOCATION = "us-central1"


class DataPipelineHook(GoogleBaseHook):
"""
Hook for Google Data Pipelines.
All the methods in the hook where project_id is used must be called with
keyword arguments rather than positional.
"""

def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(
gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain,
)

def get_conn(self) -> build:
"""Returns a Google Cloud Data Pipelines service object."""
http_authorized = self._authorize()
return build("datapipelines", "v1", http=http_authorized, cache_discovery=False)

@GoogleBaseHook.fallback_to_default_project_id
def create_data_pipeline(
self,
body: dict,
project_id: str,
location: str = DEFAULT_DATAPIPELINE_LOCATION,
) -> None:
"""
Creates a new Data Pipelines instance from the Data Pipelines API.
:param body: The request body (contains instance of Pipeline). See:
https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/create#request-body
:param project_id: The ID of the GCP project that owns the job.
:param location: The location to direct the Data Pipelines instance to (for example us-central1).
Returns the created Data Pipelines instance in JSON representation.
"""
parent = self.build_parent_name(project_id, location)
service = self.get_conn()
self.log.info(dir(service.projects().locations()))
request = (
service.projects()
.locations()
.pipelines()
.create(
parent=parent,
body=body,
)
)
response = request.execute(num_retries=self.num_retries)
return response

@staticmethod
def build_parent_name(project_id: str, location: str):
return f"projects/{project_id}/locations/{location}"
102 changes: 102 additions & 0 deletions airflow/providers/google/cloud/operators/datapipeline.py
@@ -0,0 +1,102 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Data Pipelines operators."""
from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

from airflow import AirflowException
from airflow.providers.google.cloud.hooks.datapipeline import DEFAULT_DATAPIPELINE_LOCATION, DataPipelineHook
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator

if TYPE_CHECKING:
from airflow.utils.context import Context


class CreateDataPipelineOperator(GoogleCloudBaseOperator):
"""
Creates a new Data Pipelines instance from the Data Pipelines API.
:param body: The request body (contains instance of Pipeline). See:
https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/create#request-body
:param project_id: The ID of the GCP project that owns the job.
:param location: The location to direct the Data Pipelines instance to (for example us-central1).
:param gcp_conn_id: The connection ID to connect to the Google Cloud
Platform.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
.. warning::
This option requires Apache Beam 2.39.0 or newer.
Returns the created Data Pipelines instance in JSON representation.
"""

def __init__(
self,
*,
body: dict,
project_id: str | None = None,
location: str = DEFAULT_DATAPIPELINE_LOCATION,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)

self.body = body
self.project_id = project_id
self.location = location
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.datapipeline_hook: DataPipelineHook | None = None
self.body["pipelineSources"] = {"airflow": "airflow"}

def execute(self, context: Context):
if self.body is None:
raise AirflowException(
"Request Body not given; cannot create a Data Pipeline without the Request Body."
)
if self.project_id is None:
raise AirflowException(
"Project ID not given; cannot create a Data Pipeline without the Project ID."
)
if self.location is None:
raise AirflowException("location not given; cannot create a Data Pipeline without the location.")

self.datapipeline_hook = DataPipelineHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)

self.data_pipeline = self.datapipeline_hook.create_data_pipeline(
project_id=self.project_id,
body=self.body,
location=self.location,
)
if self.data_pipeline:
if "error" in self.data_pipeline:
raise AirflowException(self.data_pipeline.get("error").get("message"))

return self.data_pipeline
11 changes: 11 additions & 0 deletions airflow/providers/google/provider.yaml
Expand Up @@ -321,6 +321,11 @@ integrations:
- /docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
logo: /integration-logos/gcp/Cloud-Dataflow.png
tags: [gcp]
- integration-name: Google Data Pipelines
external-doc-url: https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest
how-to-guide:
- /docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst
tags: [gcp]
- integration-name: Google Data Fusion
external-doc-url: https://cloud.google.com/data-fusion/
how-to-guide:
Expand Down Expand Up @@ -496,6 +501,9 @@ operators:
- integration-name: Google Dataflow
python-modules:
- airflow.providers.google.cloud.operators.dataflow
- integration-name: Google Data Pipelines
python-modules:
- airflow.providers.google.cloud.operators.datapipeline
- integration-name: Google Data Fusion
python-modules:
- airflow.providers.google.cloud.operators.datafusion
Expand Down Expand Up @@ -710,6 +718,9 @@ hooks:
- integration-name: Google Dataflow
python-modules:
- airflow.providers.google.cloud.hooks.dataflow
- integration-name: Google Data Pipelines
python-modules:
- airflow.providers.google.cloud.hooks.datapipeline
- integration-name: Google Data Fusion
python-modules:
- airflow.providers.google.cloud.hooks.datafusion
Expand Down
@@ -0,0 +1,60 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Google Cloud Data Pipelines Operators
=====================================

Data Pipelines is a Dataflow feature that allows customers to create
and schedule recurring jobs, view aggregated job metrics, and define
and manage job SLOs. A pipeline consists of a collection of jobs
including ways to manage them. A pipeline may be associated with a
Dataflow Template (classic/flex) and include all jobs launched with
the associated template.

Prerequisite Tasks
^^^^^^^^^^^^^^^^^^

.. include:: /operators/_partials/prerequisite_tasks.rst

Creating a Data Pipeline
^^^^^^^^^^^^^^^^^^^^^^^^

To create a new Data Pipelines instance using a request body and parent name, use :class:`~airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator`.
The operator accesses Google Cloud's Data Pipelines API and calls upon the
`create method <https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/create>`__
to run the given pipeline.

:class:`~airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator` accepts four parameters:
**body**: instance of the Pipeline,
**project_id**: id of the GCP project that owns the job,
**location**: destination for the Pipeline,
**gcp_conn_id**: id to connect to Google Cloud.

The request body and project id need to be passed each time, while the GCP connection id and location have default values.
The project id and location will be used to build the parent name needed to create the operator.

Here is an example of how you can create a Data Pipelines instance by running the above parameters with CreateDataPipelineOperator:

.. exampleinclude:: /../../tests/system/providers/google/cloud/datapipelines/example_datapipeline.py
:language: python
:dedent: 4
:start-after: [START howto_operator_create_data_pipeline]
:end-before: [END howto_operator_create_data_pipeline]

For further information regarding the API usage, see
`Data Pipelines API REST Resource <https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines#Pipeline>`__
in the Google Cloud documentation.
110 changes: 110 additions & 0 deletions tests/providers/google/cloud/hooks/test_datapipeline.py
@@ -0,0 +1,110 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from unittest import mock

from airflow.providers.google.cloud.hooks.datapipeline import DataPipelineHook

TASK_ID = "test-datapipeline-operators"
TEST_NAME = "projects/test-project-id/locations/test-location"
TEST_BODY = {
"name": "projects/test-project-id/locations/test-location/pipelines/test-pipeline",
"type": "PIPELINE_TYPE_BATCH",
"workload": {
"dataflowFlexTemplateRequest": {
"launchParameter": {
"containerSpecGcsPath": "gs://dataflow-templates-us-central1/latest/Word_Count_metadata",
"jobName": "test-job",
"environment": {"tempLocation": "test-temp-location"},
"parameters": {
"inputFile": "gs://dataflow-samples/shakespeare/kinglear.txt",
"output": "gs://test/output/my_output",
},
},
"projectId": "test-project-id",
"location": "test-location",
}
},
}
TEST_LOCATION = "test-location"
TEST_PROJECTID = "test-project-id"
TEST_DATA_PIPELINE_NAME = "test-data-pipeline-name"
TEST_PARENT = "projects/test-project-id/locations/test-location"
TEST_JOB_ID = "test-job-id"
TEST_NAME = "projects/test-project-id/locations/test-location/pipelines/test-data-pipeline-name"


class TestDataPipelineHook:
"""
Module meant to test the DataPipeline Hooks
"""

def setup_method(self):
self.datapipeline_hook = DataPipelineHook(gcp_conn_id="google_cloud_default")

@mock.patch("airflow.providers.google.cloud.hooks.datapipeline.DataPipelineHook._authorize")
@mock.patch("airflow.providers.google.cloud.hooks.datapipeline.build")
def test_get_conn(self, mock_build, mock_authorize):
"""
Test that get_conn is called with the correct params and
returns the correct API address
"""
connection = self.datapipeline_hook.get_conn()
mock_build.assert_called_once_with(
"datapipelines", "v1", http=mock_authorize.return_value, cache_discovery=False
)
assert mock_build.return_value == connection

@mock.patch("airflow.providers.google.cloud.hooks.datapipeline.DataPipelineHook.build_parent_name")
def test_build_parent_name(self, mock_build_parent_name):
"""
Test that build_parent_name is called with the correct params and
returns the correct parent string
"""
result = self.datapipeline_hook.build_parent_name(
project_id=TEST_PROJECTID,
location=TEST_LOCATION,
)
mock_build_parent_name.assert_called_with(
project_id=TEST_PROJECTID,
location=TEST_LOCATION,
)
assert mock_build_parent_name.return_value == result

@mock.patch("airflow.providers.google.cloud.hooks.datapipeline.DataPipelineHook.get_conn")
def test_create_data_pipeline(self, mock_connection):
"""
Test that request are called with the correct params
Test that request returns the correct value
"""
mock_locations = mock_connection.return_value.projects.return_value.locations
mock_request = mock_locations.return_value.pipelines.return_value.create
mock_request.return_value.execute.return_value = {"name": TEST_PARENT}

result = self.datapipeline_hook.create_data_pipeline(
body=TEST_BODY,
project_id=TEST_PROJECTID,
location=TEST_LOCATION,
)

mock_request.assert_called_once_with(
parent=TEST_PARENT,
body=TEST_BODY,
)
assert result == {"name": TEST_PARENT}

0 comments on commit a2a0d05

Please sign in to comment.