Skip to content

Commit

Permalink
AWS Glue job hook: Make s3_bucket parameter optional (#29659)
Browse files Browse the repository at this point in the history
* AWS Glue job hook: Make s3_bucket optional
  • Loading branch information
Romain Ardiet committed Feb 22, 2023
1 parent 9de301d commit 6c13f04
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
15 changes: 6 additions & 9 deletions airflow/providers/amazon/aws/hooks/glue.py
Expand Up @@ -100,22 +100,16 @@ def __init__(
super().__init__(*args, **kwargs)

def create_glue_job_config(self) -> dict:
if self.s3_bucket is None:
raise ValueError("Could not initialize glue job, error: Specify Parameter `s3_bucket`")

default_command = {
"Name": "glueetl",
"ScriptLocation": self.script_location,
}
command = self.create_job_kwargs.pop("Command", default_command)

s3_log_path = f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}"
execution_role = self.get_iam_execution_role()

ret_config = {
config = {
"Name": self.job_name,
"Description": self.desc,
"LogUri": s3_log_path,
"Role": execution_role["Role"]["Arn"],
"ExecutionProperty": {"MaxConcurrentRuns": self.concurrent_run_limit},
"Command": command,
Expand All @@ -124,9 +118,12 @@ def create_glue_job_config(self) -> dict:
}

if hasattr(self, "num_of_dpus"):
ret_config["MaxCapacity"] = self.num_of_dpus
config["MaxCapacity"] = self.num_of_dpus

if self.s3_bucket is not None:
config["LogUri"] = f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}"

return ret_config
return config

def list_jobs(self) -> list:
"""
Expand Down
48 changes: 48 additions & 0 deletions tests/providers/amazon/aws/hooks/test_glue.py
Expand Up @@ -136,6 +136,54 @@ class JobNotFoundException(Exception):
mock_get_conn.return_value.update_job.assert_not_called()
assert result == expected_job_name

@mock.patch.object(GlueJobHook, "get_iam_execution_role")
@mock.patch.object(GlueJobHook, "get_conn")
def test_create_or_update_glue_job_create_new_job_without_s3_bucket(
self, mock_get_conn, mock_get_iam_execution_role
):
"""
Calls 'create_or_update_glue_job' with no existing job.
Should create a new job.
"""

class JobNotFoundException(Exception):
pass

expected_job_name = "aws_test_glue_job"
job_description = "This is test case job from Airflow"
role_name = "my_test_role"
role_name_arn = "test_role"

mock_get_conn.return_value.exceptions.EntityNotFoundException = JobNotFoundException
mock_get_conn.return_value.get_job.side_effect = JobNotFoundException()
mock_get_iam_execution_role.return_value = {"Role": {"RoleName": role_name, "Arn": role_name_arn}}

hook = GlueJobHook(
job_name=expected_job_name,
desc=job_description,
concurrent_run_limit=2,
retry_limit=3,
num_of_dpus=5,
iam_role_name=role_name,
create_job_kwargs={"Command": {}},
region_name=self.some_aws_region,
)

result = hook.create_or_update_glue_job()

mock_get_conn.return_value.get_job.assert_called_once_with(JobName=expected_job_name)
mock_get_conn.return_value.create_job.assert_called_once_with(
Command={},
Description=job_description,
ExecutionProperty={"MaxConcurrentRuns": 2},
MaxCapacity=5,
MaxRetries=3,
Name=expected_job_name,
Role=role_name_arn,
)
mock_get_conn.return_value.update_job.assert_not_called()
assert result == expected_job_name

@mock.patch.object(GlueJobHook, "get_iam_execution_role")
@mock.patch.object(GlueJobHook, "get_conn")
def test_create_or_update_glue_job_update_existing_job(self, mock_get_conn, mock_get_iam_execution_role):
Expand Down

0 comments on commit 6c13f04

Please sign in to comment.