Skip to content

Commit

Permalink
Fixing ParamValidationError when executing load_file in Glue hooks/op…
Browse files Browse the repository at this point in the history
…erators (#16012)

* Fixing ParamValidationError when executing load_file in Glue hooks/operators

Co-authored-by: Rahul Raina <raina_rahul@singaporeair.com.sg>
  • Loading branch information
rahulraina7 and Rahul Raina committed Aug 10, 2021
1 parent dec1e9d commit 77c4325
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 22 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/hooks/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def get_or_create_glue_job(self) -> str:
Name=self.job_name,
Description=self.desc,
LogUri=s3_log_path,
Role=execution_role['Role']['RoleName'],
Role=execution_role['Role']['Arn'],
ExecutionProperty={"MaxConcurrentRuns": self.concurrent_run_limit},
Command={"Name": "glueetl", "ScriptLocation": self.script_location},
MaxRetries=self.retry_limit,
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/amazon/aws/hooks/glue_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def create_crawler(self, **crawler_kwargs) -> str:
"""
crawler_name = crawler_kwargs['Name']
self.log.info("Creating crawler: %s", crawler_name)
return self.glue_client.create_crawler(**crawler_kwargs)['Crawler']['Name']
crawler = self.glue_client.create_crawler(**crawler_kwargs)
return crawler

def start_crawler(self, crawler_name: str) -> dict:
"""
Expand Down
9 changes: 5 additions & 4 deletions airflow/providers/amazon/aws/operators/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,16 @@ def execute(self, context):
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
script_name = os.path.basename(self.script_location)
s3_hook.load_file(
filename=self.script_location,
key=self.s3_artifacts_prefix + script_name,
bucket_name=self.s3_bucket,
self.script_location, self.s3_artifacts_prefix + script_name, bucket_name=self.s3_bucket
)
s3_script_location = f"s3://{self.s3_bucket}/{self.s3_artifacts_prefix}{script_name}"
else:
s3_script_location = self.script_location
glue_job = AwsGlueJobHook(
job_name=self.job_name,
desc=self.job_desc,
concurrent_run_limit=self.concurrent_run_limit,
script_location=self.script_location,
script_location=s3_script_location,
retry_limit=self.retry_limit,
num_of_dpus=self.num_of_dpus,
aws_conn_id=self.aws_conn_id,
Expand Down
3 changes: 3 additions & 0 deletions tests/providers/amazon/aws/hooks/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def test_get_iam_execution_role(self):
)
iam_role = hook.get_iam_execution_role()
assert iam_role is not None
assert "Role" in iam_role
assert "Arn" in iam_role['Role']
assert iam_role['Role']['Arn'] == "arn:aws:iam::123456789012:role/my_test_role"

@mock.patch.object(AwsGlueJobHook, "get_iam_execution_role")
@mock.patch.object(AwsGlueJobHook, "get_conn")
Expand Down
5 changes: 3 additions & 2 deletions tests/providers/amazon/aws/hooks/test_glue_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ def test_update_crawler_not_needed(self, mock_get_conn):
def test_create_crawler(self, mock_get_conn):
mock_get_conn.return_value.create_crawler.return_value = {'Crawler': {'Name': mock_crawler_name}}
glue_crawler = self.hook.create_crawler(**mock_config)

self.assertEqual(glue_crawler, mock_crawler_name)
self.assertIn("Crawler", glue_crawler)
self.assertIn("Name", glue_crawler["Crawler"])
self.assertEqual(glue_crawler["Crawler"]["Name"], mock_crawler_name)

@mock.patch.object(AwsGlueCrawlerHook, "get_conn")
def test_start_crawler(self, mock_get_conn):
Expand Down
34 changes: 20 additions & 14 deletions tests/providers/amazon/aws/operators/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import unittest
from unittest import mock

from parameterized import parameterized

from airflow import configuration
from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
Expand All @@ -30,27 +32,31 @@ def setUp(self, glue_hook_mock):
configuration.load_test_config()

self.glue_hook_mock = glue_hook_mock
some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py"
self.glue = AwsGlueJobOperator(
task_id='test_glue_operator',
job_name='my_test_job',
script_location=some_script,
aws_conn_id='aws_default',
region_name='us-west-2',
s3_bucket='some_bucket',
iam_role_name='my_test_role',
)

@parameterized.expand(
[
"s3://glue-examples/glue-scripts/sample_aws_glue_job.py",
"/glue-examples/glue-scripts/sample_aws_glue_job.py",
]
)
@mock.patch.object(AwsGlueJobHook, 'get_job_state')
@mock.patch.object(AwsGlueJobHook, 'initialize_job')
@mock.patch.object(AwsGlueJobHook, "get_conn")
@mock.patch.object(S3Hook, "load_file")
def test_execute_without_failure(
self, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state
self, script_location, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state
):
glue = AwsGlueJobOperator(
task_id='test_glue_operator',
job_name='my_test_job',
script_location=script_location,
aws_conn_id='aws_default',
region_name='us-west-2',
s3_bucket='some_bucket',
iam_role_name='my_test_role',
)
mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'}
mock_get_job_state.return_value = 'SUCCEEDED'
self.glue.execute(None)

glue.execute(None)
mock_initialize_job.assert_called_once_with({})
assert self.glue.job_name == 'my_test_job'
assert glue.job_name == 'my_test_job'

0 comments on commit 77c4325

Please sign in to comment.