Skip to content

Commit

Permalink
Bugfix BigQueryToMsSqlOperator (#39171)
Browse files Browse the repository at this point in the history
  • Loading branch information
moiseenkov committed Apr 23, 2024
1 parent 4384024 commit 4ae85d7
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(
self.source_project_dataset_table = source_project_dataset_table

def get_sql_hook(self) -> MsSqlHook:
return MsSqlHook(schema=self.database, mysql_conn_id=self.mssql_conn_id)
return MsSqlHook(schema=self.database, mssql_conn_id=self.mssql_conn_id)

def persist_links(self, context: Context) -> None:
project_id, dataset_id, table_id = self.source_project_dataset_table.split(".")
Expand Down
1 change: 0 additions & 1 deletion tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ def test_providers_modules_should_have_tests(self):
"tests/providers/google/cloud/operators/vertex_ai/test_model_service.py",
"tests/providers/google/cloud/operators/vertex_ai/test_pipeline_job.py",
"tests/providers/google/cloud/sensors/test_dataform.py",
"tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py",
"tests/providers/google/cloud/transfers/test_bigquery_to_sql.py",
"tests/providers/google/cloud/transfers/test_mssql_to_gcs.py",
"tests/providers/google/cloud/transfers/test_presto_to_gcs.py",
Expand Down
88 changes: 88 additions & 0 deletions tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from unittest import mock

from airflow.providers.google.cloud.transfers.bigquery_to_mssql import BigQueryToMsSqlOperator

TASK_ID = "test-bq-create-table-operator"
TEST_DATASET = "test-dataset"
TEST_TABLE_ID = "test-table-id"
TEST_DAG_ID = "test-bigquery-operators"
TEST_PROJECT = "test-project"


class TestBigQueryToMsSqlOperator:
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mssql.BigQueryTableLink")
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryHook")
def test_execute_good_request_to_bq(self, mock_hook, mock_link):
destination_table = "table"
operator = BigQueryToMsSqlOperator(
task_id=TASK_ID,
source_project_dataset_table=f"{TEST_PROJECT}.{TEST_DATASET}.{TEST_TABLE_ID}",
target_table_name=destination_table,
replace=False,
)

operator.execute(None)
mock_hook.return_value.list_rows.assert_called_once_with(
dataset_id=TEST_DATASET,
table_id=TEST_TABLE_ID,
max_results=1000,
selected_fields=None,
start_index=0,
)

@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mssql.MsSqlHook")
def test_get_sql_hook(self, mock_hook):
hook_expected = mock_hook.return_value

destination_table = "table"
operator = BigQueryToMsSqlOperator(
task_id=TASK_ID,
source_project_dataset_table=f"{TEST_PROJECT}.{TEST_DATASET}.{TEST_TABLE_ID}",
target_table_name=destination_table,
replace=False,
)

hook_actual = operator.get_sql_hook()

assert hook_actual == hook_expected
mock_hook.assert_called_once_with(schema=operator.database, mssql_conn_id=operator.mssql_conn_id)

@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mssql.BigQueryTableLink")
def test_persist_links(self, mock_link):
mock_context = mock.MagicMock()

destination_table = "table"
operator = BigQueryToMsSqlOperator(
task_id=TASK_ID,
source_project_dataset_table=f"{TEST_PROJECT}.{TEST_DATASET}.{TEST_TABLE_ID}",
target_table_name=destination_table,
replace=False,
)
operator.persist_links(context=mock_context)

mock_link.persist.assert_called_once_with(
context=mock_context,
task_instance=operator,
dataset_id=TEST_DATASET,
project_id=TEST_PROJECT,
table_id=TEST_TABLE_ID,
)

0 comments on commit 4ae85d7

Please sign in to comment.