Skip to content

Commit

Permalink
fix setting project_id for gs to bq and bq to gs (#30053)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaro1 committed Mar 20, 2023
1 parent 732fcd7 commit af4627f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
Expand Up @@ -193,7 +193,7 @@ def _submit_job(

return hook.insert_job(
configuration=configuration,
project_id=hook.project_id,
project_id=configuration["extract"]["sourceTable"]["projectId"],
location=self.location,
job_id=job_id,
timeout=self.result_timeout,
Expand Down
21 changes: 13 additions & 8 deletions airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
Expand Up @@ -227,6 +227,7 @@ def __init__(
job_id: str | None = None,
force_rerun: bool = True,
reattach_states: set[str] | None = None,
project_id: str | None = None,
**kwargs,
) -> None:

Expand All @@ -249,6 +250,7 @@ def __init__(

# BQ config
self.destination_project_dataset_table = destination_project_dataset_table
self.project_id = project_id
self.schema_fields = schema_fields
if source_format.upper() not in ALLOWED_FORMATS:
raise ValueError(
Expand Down Expand Up @@ -306,7 +308,7 @@ def _submit_job(
# Submit a new job without waiting for it to complete.
return hook.insert_job(
configuration=self.configuration,
project_id=hook.project_id,
project_id=self.project_id,
location=self.location,
job_id=job_id,
timeout=self.result_timeout,
Expand Down Expand Up @@ -507,9 +509,9 @@ def _find_max_value_in_column(self):
raise RuntimeError(f"The {select_command} returned no rows!")

def _create_empty_table(self):
project_id, dataset_id, table_id = self.hook.split_tablename(
self.project_id, dataset_id, table_id = self.hook.split_tablename(
table_input=self.destination_project_dataset_table,
default_project_id=self.hook.project_id or "",
default_project_id=self.project_id or self.hook.project_id,
)

external_config_api_repr = {
Expand Down Expand Up @@ -556,7 +558,7 @@ def _create_empty_table(self):

# build table definition
table = Table(
table_ref=TableReference.from_string(self.destination_project_dataset_table, project_id)
table_ref=TableReference.from_string(self.destination_project_dataset_table, self.project_id)
)
table.external_data_configuration = external_config
if self.labels:
Expand All @@ -573,15 +575,18 @@ def _create_empty_table(self):

self.log.info("Creating external table: %s", self.destination_project_dataset_table)
self.hook.create_empty_table(
table_resource=table_obj_api_repr, project_id=project_id, location=self.location, exists_ok=True
table_resource=table_obj_api_repr,
project_id=self.project_id,
location=self.location,
exists_ok=True,
)
self.log.info("External table created successfully: %s", self.destination_project_dataset_table)
return table_obj_api_repr

def _use_existing_table(self):
destination_project, destination_dataset, destination_table = self.hook.split_tablename(
self.project_id, destination_dataset, destination_table = self.hook.split_tablename(
table_input=self.destination_project_dataset_table,
default_project_id=self.hook.project_id or "",
default_project_id=self.project_id or self.hook.project_id,
var_name="destination_project_dataset_table",
)

Expand All @@ -601,7 +606,7 @@ def _use_existing_table(self):
"autodetect": self.autodetect,
"createDisposition": self.create_disposition,
"destinationTable": {
"projectId": destination_project,
"projectId": self.project_id,
"datasetId": destination_dataset,
"tableId": destination_table,
},
Expand Down
20 changes: 10 additions & 10 deletions tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
Expand Up @@ -172,7 +172,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
),
Expand Down Expand Up @@ -233,7 +233,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook):
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
),
Expand Down Expand Up @@ -342,7 +342,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook):
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down Expand Up @@ -441,7 +441,7 @@ def test_description_without_external_table_should_execute_successfully(self, ho
fieldDelimiter=",",
),
},
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
location=None,
job_id=pytest.real_job_id,
timeout=None,
Expand Down Expand Up @@ -545,7 +545,7 @@ def test_source_objs_as_list_without_external_table_should_execute_successfully(
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down Expand Up @@ -645,7 +645,7 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down Expand Up @@ -746,7 +746,7 @@ def test_schema_obj_without_external_table_should_execute_successfully(self, bq_
"encoding": "UTF-8",
}
},
project_id=bq_hook.return_value.project_id,
project_id=bq_hook.return_value.split_tablename.return_value[0],
location=None,
job_id=pytest.real_job_id,
timeout=None,
Expand Down Expand Up @@ -842,7 +842,7 @@ def test_autodetect_none_without_external_table_should_execute_successfully(self
"encoding": "UTF-8",
}
},
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
location=None,
job_id=pytest.real_job_id,
timeout=None,
Expand Down Expand Up @@ -1067,7 +1067,7 @@ def test_schema_fields_integer_scanner_without_external_table_should_execute_suc
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=bq_hook.return_value.project_id,
project_id=bq_hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
),
Expand Down Expand Up @@ -1129,7 +1129,7 @@ def test_schema_fields_without_external_table_should_execute_successfully(self,
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down

0 comments on commit af4627f

Please sign in to comment.