Skip to content

Commit

Permalink
Adding support to put extra arguments for Glue Job. (#14027)
Browse files Browse the repository at this point in the history
  • Loading branch information
piffall committed Feb 27, 2021
1 parent bfef559 commit 13854c3
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
9 changes: 7 additions & 2 deletions airflow/providers/amazon/aws/hooks/glue.py
Expand Up @@ -43,8 +43,10 @@ class AwsGlueJobHook(AwsBaseHook):
:type num_of_dpus: int
:param region_name: aws region name (example: us-east-1)
:type region_name: Optional[str]
:param iam_role_name: AWS IAM Role for Glue Job
:param iam_role_name: AWS IAM Role for Glue Job Execution
:type iam_role_name: Optional[str]
:param create_job_kwargs: Extra arguments for Glue Job Creation
:type create_job_kwargs: Optional[dict]
"""

JOB_POLL_INTERVAL = 6 # polls job status after every JOB_POLL_INTERVAL seconds
Expand All @@ -60,9 +62,10 @@ def __init__(
num_of_dpus: int = 10,
region_name: Optional[str] = None,
iam_role_name: Optional[str] = None,
create_job_kwargs: Optional[dict] = None,
*args,
**kwargs,
):
): # pylint: disable=too-many-arguments
self.job_name = job_name
self.desc = desc
self.concurrent_run_limit = concurrent_run_limit
Expand All @@ -73,6 +76,7 @@ def __init__(
self.s3_bucket = s3_bucket
self.role_name = iam_role_name
self.s3_glue_logs = 'logs/glue-logs/'
self.create_job_kwargs = create_job_kwargs or {}
kwargs['client_type'] = 'glue'
super().__init__(*args, **kwargs)

Expand Down Expand Up @@ -181,6 +185,7 @@ def get_or_create_glue_job(self) -> str:
Command={"Name": "glueetl", "ScriptLocation": self.script_location},
MaxRetries=self.retry_limit,
AllocatedCapacity=self.num_of_dpus,
**self.create_job_kwargs,
)
return create_job_response['Name']
except Exception as general_error:
Expand Down
5 changes: 5 additions & 0 deletions airflow/providers/amazon/aws/operators/glue.py
Expand Up @@ -51,6 +51,8 @@ class AwsGlueJobOperator(BaseOperator):
:type s3_bucket: Optional[str]
:param iam_role_name: AWS IAM Role for Glue Job Execution
:type iam_role_name: Optional[str]
:param create_job_kwargs: Extra arguments for Glue Job Creation
:type create_job_kwargs: Optional[dict]
"""

template_fields = ()
Expand All @@ -72,6 +74,7 @@ def __init__(
region_name: Optional[str] = None,
s3_bucket: Optional[str] = None,
iam_role_name: Optional[str] = None,
create_job_kwargs: Optional[dict] = None,
**kwargs,
): # pylint: disable=too-many-arguments
super().__init__(**kwargs)
Expand All @@ -88,6 +91,7 @@ def __init__(
self.iam_role_name = iam_role_name
self.s3_protocol = "s3://"
self.s3_artifacts_prefix = 'artifacts/glue-scripts/'
self.create_job_kwargs = create_job_kwargs

def execute(self, context):
"""
Expand All @@ -110,6 +114,7 @@ def execute(self, context):
region_name=self.region_name,
s3_bucket=self.s3_bucket,
iam_role_name=self.iam_role_name,
create_job_kwargs=self.create_job_kwargs,
)
self.log.info("Initializing AWS Glue Job: %s", self.job_name)
glue_job_run = glue_job.initialize_job(self.script_args)
Expand Down

0 comments on commit 13854c3

Please sign in to comment.