diff --git a/airflow/providers/amazon/aws/hooks/glue.py b/airflow/providers/amazon/aws/hooks/glue.py index cbdb6575663fd..309955b94d368 100644 --- a/airflow/providers/amazon/aws/hooks/glue.py +++ b/airflow/providers/amazon/aws/hooks/glue.py @@ -100,22 +100,16 @@ def __init__( super().__init__(*args, **kwargs) def create_glue_job_config(self) -> dict: - if self.s3_bucket is None: - raise ValueError("Could not initialize glue job, error: Specify Parameter `s3_bucket`") - default_command = { "Name": "glueetl", "ScriptLocation": self.script_location, } command = self.create_job_kwargs.pop("Command", default_command) - - s3_log_path = f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}" execution_role = self.get_iam_execution_role() - ret_config = { + config = { "Name": self.job_name, "Description": self.desc, - "LogUri": s3_log_path, "Role": execution_role["Role"]["Arn"], "ExecutionProperty": {"MaxConcurrentRuns": self.concurrent_run_limit}, "Command": command, @@ -124,9 +118,12 @@ def create_glue_job_config(self) -> dict: } if hasattr(self, "num_of_dpus"): - ret_config["MaxCapacity"] = self.num_of_dpus + config["MaxCapacity"] = self.num_of_dpus + + if self.s3_bucket is not None: + config["LogUri"] = f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}" - return ret_config + return config def list_jobs(self) -> list: """ diff --git a/tests/providers/amazon/aws/hooks/test_glue.py b/tests/providers/amazon/aws/hooks/test_glue.py index 4c64728dc81fc..b542d797c9f24 100644 --- a/tests/providers/amazon/aws/hooks/test_glue.py +++ b/tests/providers/amazon/aws/hooks/test_glue.py @@ -136,6 +136,54 @@ class JobNotFoundException(Exception): mock_get_conn.return_value.update_job.assert_not_called() assert result == expected_job_name + @mock.patch.object(GlueJobHook, "get_iam_execution_role") + @mock.patch.object(GlueJobHook, "get_conn") + def test_create_or_update_glue_job_create_new_job_without_s3_bucket( + self, mock_get_conn, mock_get_iam_execution_role + ): + """ + Calls 'create_or_update_glue_job' with no existing job. + Should create a new job. + """ + + class JobNotFoundException(Exception): + pass + + expected_job_name = "aws_test_glue_job" + job_description = "This is test case job from Airflow" + role_name = "my_test_role" + role_name_arn = "test_role" + + mock_get_conn.return_value.exceptions.EntityNotFoundException = JobNotFoundException + mock_get_conn.return_value.get_job.side_effect = JobNotFoundException() + mock_get_iam_execution_role.return_value = {"Role": {"RoleName": role_name, "Arn": role_name_arn}} + + hook = GlueJobHook( + job_name=expected_job_name, + desc=job_description, + concurrent_run_limit=2, + retry_limit=3, + num_of_dpus=5, + iam_role_name=role_name, + create_job_kwargs={"Command": {}}, + region_name=self.some_aws_region, + ) + + result = hook.create_or_update_glue_job() + + mock_get_conn.return_value.get_job.assert_called_once_with(JobName=expected_job_name) + mock_get_conn.return_value.create_job.assert_called_once_with( + Command={}, + Description=job_description, + ExecutionProperty={"MaxConcurrentRuns": 2}, + MaxCapacity=5, + MaxRetries=3, + Name=expected_job_name, + Role=role_name_arn, + ) + mock_get_conn.return_value.update_job.assert_not_called() + assert result == expected_job_name + @mock.patch.object(GlueJobHook, "get_iam_execution_role") @mock.patch.object(GlueJobHook, "get_conn") def test_create_or_update_glue_job_update_existing_job(self, mock_get_conn, mock_get_iam_execution_role):