Skip to content

Commit

Permalink
Fix Datafusion system tests (#32749)
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova committed Jul 23, 2023
1 parent 56c41d4 commit 82e6226
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 64 deletions.
23 changes: 17 additions & 6 deletions airflow/providers/google/cloud/hooks/datafusion.py
Expand Up @@ -319,7 +319,7 @@ def create_pipeline(
namespace: str = "default",
) -> None:
"""
Creates a Cloud Data Fusion pipeline.
Creates a batch Cloud Data Fusion pipeline.
:param pipeline_name: Your pipeline name.
:param pipeline: The pipeline definition. For more information check:
Expand All @@ -343,12 +343,12 @@ def delete_pipeline(
namespace: str = "default",
) -> None:
"""
Deletes a Cloud Data Fusion pipeline.
Deletes a batch Cloud Data Fusion pipeline.
:param pipeline_name: Your pipeline name.
:param version_id: Version of pipeline to delete
:param instance_url: Endpoint on which the REST APIs is accessible for the instance.
:param namespace: f your pipeline belongs to a Basic edition instance, the namespace ID
:param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID
is always default. If your pipeline belongs to an Enterprise edition instance, you
can create a namespace.
"""
Expand All @@ -357,9 +357,20 @@ def delete_pipeline(
url = os.path.join(url, "versions", version_id)

response = self._cdap_request(url=url, method="DELETE", body=None)
self._check_response_status_and_data(
response, f"Deleting a pipeline failed with code {response.status}"
)
# Check for 409 error: the previous step for starting/stopping pipeline could still be in progress.
# Waiting some time before retry.
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
try:
self._check_response_status_and_data(
response, f"Deleting a pipeline failed with code {response.status}: {response.data}"
)
break
except AirflowException as exc:
if "409" in str(exc):
sleep(time_to_wait)
response = self._cdap_request(url=url, method="DELETE", body=None)
else:
raise

def list_pipelines(
self,
Expand Down
Expand Up @@ -23,7 +23,6 @@
from datetime import datetime

from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.datafusion import (
CloudDataFusionCreateInstanceOperator,
CloudDataFusionCreatePipelineOperator,
Expand All @@ -36,34 +35,36 @@
CloudDataFusionStopPipelineOperator,
CloudDataFusionUpdateInstanceOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.sensors.datafusion import CloudDataFusionPipelineStateSensor
from airflow.utils.trigger_rule import TriggerRule

# [START howto_data_fusion_env_variables]
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
LOCATION = "europe-north1"
INSTANCE_NAME = "airflow-test-instance"
DAG_ID = "example_data_fusion"
INSTANCE_NAME = "test-instance"
INSTANCE = {
"type": "BASIC",
"displayName": INSTANCE_NAME,
"dataprocServiceAccount": SERVICE_ACCOUNT,
}

BUCKET_1 = "test-datafusion-1"
BUCKET_2 = "test-datafusion-2"
BUCKET_1_URI = f"gs://{BUCKET_1}"
BUCKET_2_URI = f"gs://{BUCKET_2}"
BUCKET_NAME_1 = "test-datafusion-1"
BUCKET_NAME_2 = "test-datafusion-2"
BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"

PIPELINE_NAME = "test-pipe"
PIPELINE = {
"artifact": {
"name": "cdap-data-pipeline",
"version": "6.7.2",
"version": "6.8.3",
"scope": "SYSTEM",
},
"description": "Data Pipeline Application",
"name": "test-pipe",
"name": PIPELINE_NAME,
"config": {
"resources": {"memoryMB": 2048, "virtualCores": 1},
"driverResources": {"memoryMB": 2048, "virtualCores": 1},
Expand All @@ -80,7 +81,7 @@
"name": "GCSFile",
"type": "batchsource",
"label": "GCS",
"artifact": {"name": "google-cloud", "version": "0.20.3", "scope": "SYSTEM"},
"artifact": {"name": "google-cloud", "version": "0.21.2", "scope": "SYSTEM"},
"properties": {
"project": "auto-detect",
"format": "text",
Expand All @@ -91,7 +92,7 @@
"encrypted": "false",
"schema": '{"type":"record","name":"textfile","fields":[{"name"\
:"offset","type":"long"},{"name":"body","type":"string"}]}',
"path": BUCKET_1_URI,
"path": BUCKET_NAME_1_URI,
"referenceName": "foo_bucket",
"useConnection": "false",
"serviceAccountType": "filePath",
Expand All @@ -109,7 +110,7 @@
"name": "GCS",
"type": "batchsink",
"label": "GCS2",
"artifact": {"name": "google-cloud", "version": "0.20.3", "scope": "SYSTEM"},
"artifact": {"name": "google-cloud", "version": "0.21.2", "scope": "SYSTEM"},
"properties": {
"project": "auto-detect",
"suffix": "yyyy-MM-dd-HH-mm",
Expand All @@ -119,7 +120,7 @@
"schema": '{"type":"record","name":"textfile","fields":[{"name"\
:"offset","type":"long"},{"name":"body","type":"string"}]}',
"referenceName": "bar",
"path": BUCKET_2_URI,
"path": BUCKET_NAME_2_URI,
"serviceAccountType": "filePath",
"contentType": "application/octet-stream",
},
Expand All @@ -146,19 +147,20 @@
# [END howto_data_fusion_env_variables]

with models.DAG(
"example_data_fusion",
DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "datafusion"],
) as dag:
create_bucket1 = GCSCreateBucketOperator(
task_id="create_bucket1",
bucket_name=BUCKET_1,
bucket_name=BUCKET_NAME_1,
project_id=PROJECT_ID,
)

create_bucket2 = GCSCreateBucketOperator(
task_id="create_bucket2",
bucket_name=BUCKET_2,
bucket_name=BUCKET_NAME_2,
project_id=PROJECT_ID,
)

Expand Down Expand Up @@ -255,38 +257,44 @@
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="delete_pipeline",
trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_cloud_data_fusion_delete_pipeline]

# [START howto_cloud_data_fusion_delete_instance_operator]
delete_instance = CloudDataFusionDeleteInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="delete_instance"
location=LOCATION,
instance_name=INSTANCE_NAME,
task_id="delete_instance",
trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_cloud_data_fusion_delete_instance_operator]

# Add sleep before creating pipeline
sleep = BashOperator(task_id="sleep", bash_command="sleep 60")

# Add sleep before creating pipeline
sleep_30 = BashOperator(task_id="sleep_30", bash_command="sleep 30")
delete_bucket1 = GCSDeleteBucketOperator(
task_id="delete_bucket1", bucket_name=BUCKET_NAME_1, trigger_rule=TriggerRule.ALL_DONE
)
delete_bucket2 = GCSDeleteBucketOperator(
task_id="delete_bucket2", bucket_name=BUCKET_NAME_1, trigger_rule=TriggerRule.ALL_DONE
)

(
create_bucket1
>> create_bucket2
# TEST SETUP
[create_bucket1, create_bucket2]
# TEST BODY
>> create_instance
>> get_instance
>> restart_instance
>> update_instance
>> sleep
>> create_pipeline
>> list_pipelines
>> start_pipeline_async
>> start_pipeline_sensor
>> start_pipeline
>> stop_pipeline
>> sleep_30
>> delete_pipeline
>> delete_instance
# TEST TEARDOWN
>> [delete_bucket1, delete_bucket2]
)

from tests.system.utils.watcher import watcher
Expand Down
Expand Up @@ -23,7 +23,6 @@
from datetime import datetime

from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.datafusion import (
CloudDataFusionCreateInstanceOperator,
CloudDataFusionCreatePipelineOperator,
Expand All @@ -36,33 +35,35 @@
CloudDataFusionStopPipelineOperator,
CloudDataFusionUpdateInstanceOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.utils.trigger_rule import TriggerRule

# [START howto_data_fusion_env_variables]
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
DAG_ID = "example_data_fusion_async"
LOCATION = "europe-north1"
INSTANCE_NAME = "airflow-test-instance"
INSTANCE_NAME = "test-instance-async"
INSTANCE = {
"type": "BASIC",
"displayName": INSTANCE_NAME,
"dataprocServiceAccount": SERVICE_ACCOUNT,
}

BUCKET_1 = "test-datafusion-1"
BUCKET_2 = "test-datafusion-2"
BUCKET_1_URI = f"gs://{BUCKET_1}"
BUCKET_2_URI = f"gs://{BUCKET_2}"
BUCKET_NAME_1 = "test-datafusion-async-1"
BUCKET_NAME_2 = "test-datafusion-async-2"
BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"

PIPELINE_NAME = "test-pipe"
PIPELINE = {
"artifact": {
"name": "cdap-data-pipeline",
"version": "6.7.2",
"version": "6.8.3",
"scope": "SYSTEM",
},
"description": "Data Pipeline Application",
"name": "test-pipe",
"name": PIPELINE_NAME,
"config": {
"resources": {"memoryMB": 2048, "virtualCores": 1},
"driverResources": {"memoryMB": 2048, "virtualCores": 1},
Expand All @@ -79,7 +80,7 @@
"name": "GCSFile",
"type": "batchsource",
"label": "GCS",
"artifact": {"name": "google-cloud", "version": "0.20.3", "scope": "SYSTEM"},
"artifact": {"name": "google-cloud", "version": "0.21.2", "scope": "SYSTEM"},
"properties": {
"project": "auto-detect",
"format": "text",
Expand All @@ -90,7 +91,7 @@
"encrypted": "false",
"schema": '{"type":"record","name":"textfile","fields":[{"name"\
:"offset","type":"long"},{"name":"body","type":"string"}]}',
"path": BUCKET_1_URI,
"path": BUCKET_NAME_1_URI,
"referenceName": "foo_bucket",
"useConnection": "false",
"serviceAccountType": "filePath",
Expand All @@ -108,7 +109,7 @@
"name": "GCS",
"type": "batchsink",
"label": "GCS2",
"artifact": {"name": "google-cloud", "version": "0.20.3", "scope": "SYSTEM"},
"artifact": {"name": "google-cloud", "version": "0.21.2", "scope": "SYSTEM"},
"properties": {
"project": "auto-detect",
"suffix": "yyyy-MM-dd-HH-mm",
Expand All @@ -118,7 +119,7 @@
"schema": '{"type":"record","name":"textfile","fields":[{"name"\
:"offset","type":"long"},{"name":"body","type":"string"}]}',
"referenceName": "bar",
"path": BUCKET_2_URI,
"path": BUCKET_NAME_2_URI,
"serviceAccountType": "filePath",
"contentType": "application/octet-stream",
},
Expand All @@ -145,19 +146,20 @@
# [END howto_data_fusion_env_variables]

with models.DAG(
"example_data_fusion_async",
DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "datafusion", "deferrable"],
) as dag:
create_bucket1 = GCSCreateBucketOperator(
task_id="create_bucket1",
bucket_name=BUCKET_1,
bucket_name=BUCKET_NAME_1,
project_id=PROJECT_ID,
)

create_bucket2 = GCSCreateBucketOperator(
task_id="create_bucket2",
bucket_name=BUCKET_2,
bucket_name=BUCKET_NAME_2,
project_id=PROJECT_ID,
)

Expand Down Expand Up @@ -209,7 +211,7 @@
# [END howto_cloud_data_fusion_list_pipelines]

# [START howto_cloud_data_fusion_start_pipeline_def]
start_pipeline_async = CloudDataFusionStartPipelineOperator(
start_pipeline_def = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
Expand All @@ -233,40 +235,42 @@
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="delete_pipeline",
trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_cloud_data_fusion_delete_pipeline]

# [START howto_cloud_data_fusion_delete_instance_operator]
delete_instance = CloudDataFusionDeleteInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="delete_instance"
location=LOCATION,
instance_name=INSTANCE_NAME,
task_id="delete_instance",
trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_cloud_data_fusion_delete_instance_operator]
#
# Add sleep before creating pipeline
sleep_30_1 = BashOperator(task_id="sleep_30_1", bash_command="sleep 30")

# Add sleep before deleting pipeline
sleep_30 = BashOperator(task_id="sleep_30", bash_command="sleep 30")

# Add sleep before starting pipeline
sleep_20 = BashOperator(task_id="sleep_20", bash_command="sleep 40")
delete_bucket1 = GCSDeleteBucketOperator(
task_id="delete_bucket1", bucket_name=BUCKET_NAME_1, trigger_rule=TriggerRule.ALL_DONE
)
delete_bucket2 = GCSDeleteBucketOperator(
task_id="delete_bucket2", bucket_name=BUCKET_NAME_1, trigger_rule=TriggerRule.ALL_DONE
)

(
create_bucket1
>> create_bucket2
# TEST SETUP
[create_bucket1, create_bucket2]
# TEST BODY
>> create_instance
>> get_instance
>> restart_instance
>> update_instance
>> sleep_30_1
>> create_pipeline
>> list_pipelines
>> sleep_20
>> start_pipeline_async
>> start_pipeline_def
>> stop_pipeline
>> sleep_30
>> delete_pipeline
>> delete_instance
# TEST TEARDOWN
>> [delete_bucket1, delete_bucket2]
)

from tests.system.utils.watcher import watcher
Expand Down

0 comments on commit 82e6226

Please sign in to comment.