diff --git a/airflow/providers/google/cloud/utils/dataform.py b/airflow/providers/google/cloud/utils/dataform.py index 743ac06950bbc..3395843cec420 100644 --- a/airflow/providers/google/cloud/utils/dataform.py +++ b/airflow/providers/google/cloud/utils/dataform.py @@ -40,6 +40,7 @@ def make_initialization_workspace_flow( region: str, repository_id: str, workspace_id: str, + dataform_schema_name: str = "dataform", package_name: str | None = None, without_installation: bool = False, ) -> tuple: @@ -50,6 +51,7 @@ def make_initialization_workspace_flow( :param region: Required. The ID of the Google Cloud region where workspace located. :param repository_id: Required. The ID of the Dataform repository where workspace located. :param workspace_id: Required. The ID of the Dataform workspace which requires initialization. + :param dataform_schema_name: Name of the schema. :param package_name: Name of the package. If value is not provided then workspace_id will be used. :param without_installation: Defines should installation of npm packages be added to flow. """ @@ -135,7 +137,7 @@ def make_initialization_workspace_flow( default_location: str = define_default_location(region).value dataform_config_content = json.dumps( { - "defaultSchema": "dataform", + "defaultSchema": dataform_schema_name, "assertionSchema": "dataform_assertions", "warehouse": "bigquery", "defaultDatabase": project_id, diff --git a/tests/system/providers/google/cloud/dataform/example_dataform.py b/tests/system/providers/google/cloud/dataform/example_dataform.py index 5ea66b6fa8ce5..f47f8579c501e 100644 --- a/tests/system/providers/google/cloud/dataform/example_dataform.py +++ b/tests/system/providers/google/cloud/dataform/example_dataform.py @@ -55,7 +55,7 @@ REPOSITORY_ID = f"example_dataform_repository_{ENV_ID}" REGION = "us-central1" WORKSPACE_ID = f"example_dataform_workspace_{ENV_ID}" -DEFAULT_DATASET = "dataform" +DATAFORM_SCHEMA_NAME = f"schema_{DAG_ID}_{ENV_ID}" # This DAG is not self-run we need to do some extra configuration to execute it in automation process with models.DAG( @@ -84,13 +84,6 @@ ) # [END howto_operator_create_workspace] - # Delete the default dataset if it exists in the bigquery - delete_dataset = BigQueryDeleteDatasetOperator( - task_id="delete_dataset", - dataset_id=DEFAULT_DATASET, - delete_contents=True, - ) - # [START howto_initialize_workspace] first_initialization_step, last_initialization_step = make_initialization_workspace_flow( project_id=PROJECT_ID, @@ -99,6 +92,7 @@ workspace_id=WORKSPACE_ID, package_name=f"dataform_package_{ENV_ID}", without_installation=True, + dataform_schema_name=DATAFORM_SCHEMA_NAME, ) # [END howto_initialize_workspace] @@ -259,9 +253,9 @@ ) # [END howto_operator_remove_directory] - delete_dataset_2 = BigQueryDeleteDatasetOperator( - task_id="delete_dataset_2", - dataset_id=DEFAULT_DATASET, + delete_dataset = BigQueryDeleteDatasetOperator( + task_id="delete_dataset", + dataset_id=DATAFORM_SCHEMA_NAME, delete_contents=True, trigger_rule=TriggerRule.ALL_DONE, ) @@ -289,7 +283,7 @@ delete_repository.trigger_rule = TriggerRule.ALL_DONE - (make_repository >> make_workspace >> delete_dataset >> first_initialization_step) + (make_repository >> make_workspace >> first_initialization_step) ( last_initialization_step >> install_npm_packages @@ -305,7 +299,7 @@ >> write_test_file >> remove_test_file >> remove_test_directory - >> delete_dataset_2 + >> delete_dataset >> delete_workspace >> delete_repository )