/
test_bigquery_to_bigquery.py
112 lines (100 loc) · 4.47 KB
/
test_bigquery_to_bigquery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from unittest import mock
from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator
BQ_HOOK_PATH = "airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryHook"
TASK_ID = "test-bq-create-table-operator"
TEST_GCP_PROJECT_ID = "test-project"
TEST_DATASET = "test-dataset"
TEST_TABLE_ID = "test-table-id"
SOURCE_PROJECT_DATASET_TABLES = f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}"
DESTINATION_PROJECT_DATASET_TABLE = f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET + '_new'}.{TEST_TABLE_ID}"
WRITE_DISPOSITION = "WRITE_EMPTY"
CREATE_DISPOSITION = "CREATE_IF_NEEDED"
LABELS = {"k1": "v1"}
ENCRYPTION_CONFIGURATION = {"key": "kk"}
def split_tablename_side_effect(*args, **kwargs):
if kwargs["table_input"] == SOURCE_PROJECT_DATASET_TABLES:
return (
TEST_GCP_PROJECT_ID,
TEST_DATASET,
TEST_TABLE_ID,
)
elif kwargs["table_input"] == DESTINATION_PROJECT_DATASET_TABLE:
return (
TEST_GCP_PROJECT_ID,
TEST_DATASET + "_new",
TEST_TABLE_ID,
)
class TestBigQueryToBigQueryOperator:
@mock.patch(BQ_HOOK_PATH)
def test_execute_without_location_should_execute_successfully(self, mock_hook):
operator = BigQueryToBigQueryOperator(
task_id=TASK_ID,
source_project_dataset_tables=SOURCE_PROJECT_DATASET_TABLES,
destination_project_dataset_table=DESTINATION_PROJECT_DATASET_TABLE,
write_disposition=WRITE_DISPOSITION,
create_disposition=CREATE_DISPOSITION,
labels=LABELS,
encryption_configuration=ENCRYPTION_CONFIGURATION,
)
mock_hook.return_value.split_tablename.side_effect = split_tablename_side_effect
operator.execute(context=mock.MagicMock())
mock_hook.return_value.insert_job.assert_called_once_with(
configuration={
"copy": {
"createDisposition": CREATE_DISPOSITION,
"destinationEncryptionConfiguration": ENCRYPTION_CONFIGURATION,
"destinationTable": {
"datasetId": TEST_DATASET + "_new",
"projectId": TEST_GCP_PROJECT_ID,
"tableId": TEST_TABLE_ID,
},
"sourceTables": [
{
"datasetId": TEST_DATASET,
"projectId": TEST_GCP_PROJECT_ID,
"tableId": TEST_TABLE_ID,
},
],
"writeDisposition": WRITE_DISPOSITION,
},
"labels": LABELS,
},
project_id=mock_hook.return_value.project_id,
)
@mock.patch(BQ_HOOK_PATH)
def test_execute_single_regional_location_should_execute_successfully(self, mock_hook):
location = "us-central1"
operator = BigQueryToBigQueryOperator(
task_id=TASK_ID,
source_project_dataset_tables=SOURCE_PROJECT_DATASET_TABLES,
destination_project_dataset_table=DESTINATION_PROJECT_DATASET_TABLE,
write_disposition=WRITE_DISPOSITION,
create_disposition=CREATE_DISPOSITION,
labels=LABELS,
encryption_configuration=ENCRYPTION_CONFIGURATION,
location=location,
)
mock_hook.return_value.split_tablename.side_effect = split_tablename_side_effect
operator.execute(context=mock.MagicMock())
mock_hook.return_value.get_job.assert_called_once_with(
job_id=mock_hook.return_value.insert_job.return_value.job_id,
location=location,
)