Skip to content

Commit

Permalink
Move validation of templated input params to run after the context in…
Browse files Browse the repository at this point in the history
…it (#19048)

* Fix #14682, move input params validation into `execute()`

* Adjust tests for LocalFilesystemToS3Operator
  • Loading branch information
eskarimov committed Oct 27, 2021
1 parent efdfd15 commit 3c08c02
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 11 deletions.
2 changes: 2 additions & 0 deletions airflow/providers/amazon/aws/transfers/local_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ def __init__(
self.gzip = gzip
self.acl_policy = acl_policy

def _check_inputs(self):
if 's3://' in self.dest_key and self.dest_bucket is not None:
raise TypeError('dest_bucket should be None when dest_key is provided as a full s3:// file path.')

def execute(self, context):
self._check_inputs()
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
s3_hook.load_file(
self.filename,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def __init__(
self.gzip = gzip
self.google_impersonation_chain = google_impersonation_chain

if dest_gcs and not gcs_object_is_directory(self.dest_gcs):
def _check_inputs(self) -> None:
if self.dest_gcs and not gcs_object_is_directory(self.dest_gcs):
self.log.info(
'Destination Google Cloud Storage path is not a valid '
'"directory", define a path that ends with a slash "/" or '
Expand All @@ -114,6 +115,7 @@ def __init__(
)

def execute(self, context):
self._check_inputs()
azure_fileshare_hook = AzureFileShareHook(self.azure_fileshare_conn_id)
files = azure_fileshare_hook.list_files(
share_name=self.share_name, directory_name=self.directory_name
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/google/cloud/transfers/s3_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ def __init__(
self.gzip = gzip
self.google_impersonation_chain = google_impersonation_chain

if dest_gcs and not gcs_object_is_directory(self.dest_gcs):
def _check_inputs(self) -> None:
if self.dest_gcs and not gcs_object_is_directory(self.dest_gcs):
self.log.info(
'Destination Google Cloud Storage path is not a valid '
'"directory", define a path that ends with a slash "/" or '
Expand All @@ -158,6 +159,7 @@ def __init__(
)

def execute(self, context):
self._check_inputs()
# use the super method to list all the files in an S3 bucket/key
files = super().execute(context)

Expand Down
19 changes: 10 additions & 9 deletions tests/providers/amazon/aws/transfers/test_local_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ def test_init(self):
assert operator.encrypt == self._config['encrypt']
assert operator.gzip == self._config['gzip']

def test_init_exception(self):
def test_execute_exception(self):
operator = LocalFilesystemToS3Operator(
task_id='file_to_s3_operatro_exception',
dag=self.dag,
filename=self.testfile1,
dest_key=f's3://dummy/{self.dest_key}',
dest_bucket=self.dest_bucket,
**self._config,
)
with self.assertRaises(TypeError):
LocalFilesystemToS3Operator(
task_id='file_to_s3_operatro_exception',
dag=self.dag,
filename=self.testfile1,
dest_key=f's3://dummy/{self.dest_key}',
dest_bucket=self.dest_bucket,
**self._config,
)
operator.execute(None)

@mock_s3
def test_execute(self):
Expand Down

0 comments on commit 3c08c02

Please sign in to comment.