Skip to content

Commit

Permalink
Add Python and Glue version parameters, add check mode (ansible-colle…
Browse files Browse the repository at this point in the history
…ctions#480)

Add Python and Glue version parameters, add check mode

SUMMARY

Add parameters for Python version and Glue version.
Available Python and Glue version can be found here:
https://docs.aws.amazon.com/glue/latest/dg/add-job.html

ISSUE TYPE


Feature Pull Request

COMPONENT NAME

aws_glue_job
ADDITIONAL INFORMATION



Example:
community.aws.aws_glue_job:
  - name: my-job
    description: My test job
    command_script_location: my-s3-bucket/script.py
    command_python_version: 3
    glue_version: "2.0"
    role: MyGlueJobRole
    state: present

Reviewed-by: Mark Chappell <None>
Reviewed-by: Ivan Chekaldin <None>
Reviewed-by: Jill R <None>
Reviewed-by: Alina Buzachis <None>
Reviewed-by: None <None>
  • Loading branch information
ichekaldin authored and abikouo committed Sep 18, 2023
1 parent da2f980 commit 1f93316
Showing 1 changed file with 121 additions and 30 deletions.
151 changes: 121 additions & 30 deletions aws_glue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
- The name of the job command. This must be 'glueetl'.
default: glueetl
type: str
command_python_version:
description:
- Python version being used to execute a Python shell job.
- AWS currently supports C('2') or C('3').
type: str
version_added: 2.2.0
command_script_location:
description:
- The S3 path to a script that executes a job.
Expand All @@ -47,6 +53,11 @@
description:
- Description of the job being defined.
type: str
glue_version:
description:
- Glue version determines the versions of Apache Spark and Python that AWS Glue supports.
type: str
version_added: 1.5.0
max_concurrent_runs:
description:
- The maximum number of concurrent runs allowed for the job. The default is 1. An error is returned when
Expand All @@ -61,6 +72,18 @@
- The name you assign to this job definition. It must be unique in your account.
required: true
type: str
number_of_workers:
description:
- The number of workers of a defined workerType that are allocated when a job runs.
type: int
version_added: 1.5.0
purge_tags:
description:
- If C(true), existing tags will be purged from the resource to match exactly what is defined by I(tags) parameter.
- If the I(tags) parameter is not set then tags will not be modified.
default: true
type: bool
version_added: 2.2.0
role:
description:
- The name or ARN of the IAM role associated with this job.
Expand All @@ -72,26 +95,22 @@
required: true
choices: [ 'present', 'absent' ]
type: str
tags:
description:
- A hash/dictionary of tags to be applied to the job.
- Remove completely or specify an empty dictionary to remove all tags.
type: dict
version_added: 2.2.0
timeout:
description:
- The job timeout in minutes.
type: int
glue_version:
description:
- Glue version determines the versions of Apache Spark and Python that AWS Glue supports.
type: str
version_added: 1.5.0
worker_type:
description:
- The type of predefined worker that is allocated when a job runs.
choices: [ 'Standard', 'G.1X', 'G.2X' ]
type: str
version_added: 1.5.0
number_of_workers:
description:
- The number of workers of a defined workerType that are allocated when a job runs.
type: int
version_added: 1.5.0
extends_documentation_fragment:
- amazon.aws.aws
- amazon.aws.ec2
Expand All @@ -103,7 +122,10 @@
# Create an AWS Glue job
- community.aws.aws_glue_job:
command_script_location: s3bucket/script.py
command_script_location: "s3://s3bucket/script.py"
default_arguments:
"--extra-py-files": s3://s3bucket/script-package.zip
"--TempDir": "s3://s3bucket/temp/"
name: my-glue-job
role: my-iam-role
state: present
Expand Down Expand Up @@ -138,6 +160,11 @@
returned: when state is present
type: str
sample: mybucket/myscript.py
python_version:
description: Specifies the Python version.
returned: when state is present
type: str
sample: 3
connections:
description: The connections used for this job.
returned: when state is present
Expand All @@ -158,6 +185,11 @@
returned: when state is present
type: str
sample: My first Glue job
glue_version:
description: Glue version.
returned: when state is present
type: str
sample: 2.0
job_name:
description: The name of the AWS Glue job.
returned: always
Expand Down Expand Up @@ -213,6 +245,11 @@

from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule
from ansible_collections.amazon.aws.plugins.module_utils.core import is_boto3_error_code
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ansible_dict_to_boto3_tag_list
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import AWSRetry
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import boto3_tag_list_to_ansible_dict
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import compare_aws_tags
from ansible_collections.amazon.aws.plugins.module_utils.iam import get_aws_account_info


def _get_glue_job(connection, module, glue_job_name):
Expand All @@ -224,9 +261,8 @@ def _get_glue_job(connection, module, glue_job_name):
:param glue_job_name: Name of Glue job to get
:return: boto3 Glue job dict or None if not found
"""

try:
return connection.get_job(JobName=glue_job_name)['Job']
return connection.get_job(aws_retry=True, JobName=glue_job_name)['Job']
except is_boto3_error_code('EntityNotFoundException'):
return None
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: # pylint: disable=duplicate-except
Expand All @@ -241,7 +277,6 @@ def _compare_glue_job_params(user_params, current_params):
:param current_params: the Glue job parameters currently configured
:return: True if any parameter is mismatched else False
"""

# Weirdly, boto3 doesn't return some keys if the value is empty e.g. Description
# To counter this, add the key if it's missing with a blank value

Expand All @@ -252,18 +287,25 @@ def _compare_glue_job_params(user_params, current_params):

if 'AllocatedCapacity' in user_params and user_params['AllocatedCapacity'] != current_params['AllocatedCapacity']:
return True
if 'Command' in user_params and user_params['Command']['ScriptLocation'] != current_params['Command']['ScriptLocation']:
return True
if 'Connections' in user_params and set(user_params['Connections']) != set(current_params['Connections']):
if 'Command' in user_params:
if user_params['Command']['ScriptLocation'] != current_params['Command']['ScriptLocation']:
return True
if user_params['Command']['PythonVersion'] != current_params['Command']['PythonVersion']:
return True
if 'Connections' in user_params and user_params['Connections'] != current_params['Connections']:
return True
if 'DefaultArguments' in user_params and set(user_params['DefaultArguments']) != set(current_params['DefaultArguments']):
if 'DefaultArguments' in user_params and user_params['DefaultArguments'] != current_params['DefaultArguments']:
return True
if 'Description' in user_params and user_params['Description'] != current_params['Description']:
return True
if 'ExecutionProperty' in user_params and user_params['ExecutionProperty']['MaxConcurrentRuns'] != current_params['ExecutionProperty']['MaxConcurrentRuns']:
return True
if 'GlueVersion' in user_params and user_params['GlueVersion'] != current_params['GlueVersion']:
return True
if 'MaxRetries' in user_params and user_params['MaxRetries'] != current_params['MaxRetries']:
return True
if 'Role' in user_params and user_params['Role'] != current_params['Role']:
return True
if 'Timeout' in user_params and user_params['Timeout'] != current_params['Timeout']:
return True
if 'GlueVersion' in user_params and user_params['GlueVersion'] != current_params['GlueVersion']:
Expand All @@ -276,6 +318,44 @@ def _compare_glue_job_params(user_params, current_params):
return False


def ensure_tags(connection, module, glue_job):
changed = False

if module.params.get('tags') is None:
return False

account_id, partition = get_aws_account_info(module)
arn = 'arn:{0}:glue:{1}:{2}:job/{3}'.format(partition, module.region, account_id, module.params.get('name'))

try:
existing_tags = connection.get_tags(aws_retry=True, ResourceArn=arn).get('Tags', {})
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
if module.check_mode:
existing_tags = {}
else:
module.fail_json_aws(e, msg='Unable to get tags for Glue job %s' % module.params.get('name'))

tags_to_add, tags_to_remove = compare_aws_tags(existing_tags, module.params.get('tags'), module.params.get('purge_tags'))

if tags_to_remove:
changed = True
if not module.check_mode:
try:
connection.untag_resource(aws_retry=True, ResourceArn=arn, TagsToRemove=tags_to_remove)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
module.fail_json_aws(e, msg='Unable to set tags for Glue job %s' % module.params.get('name'))

if tags_to_add:
changed = True
if not module.check_mode:
try:
connection.tag_resource(aws_retry=True, ResourceArn=arn, TagsToAdd=tags_to_add)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
module.fail_json_aws(e, msg='Unable to set tags for Glue job %s' % module.params.get('name'))

return changed


def create_or_update_glue_job(connection, module, glue_job):
"""
Create or update an AWS Glue job
Expand All @@ -294,12 +374,16 @@ def create_or_update_glue_job(connection, module, glue_job):
params['AllocatedCapacity'] = module.params.get("allocated_capacity")
if module.params.get("command_script_location") is not None:
params['Command'] = {'Name': module.params.get("command_name"), 'ScriptLocation': module.params.get("command_script_location")}
if module.params.get("command_python_version") is not None:
params['Command']['PythonVersion'] = module.params.get("command_python_version")
if module.params.get("connections") is not None:
params['Connections'] = {'Connections': module.params.get("connections")}
if module.params.get("default_arguments") is not None:
params['DefaultArguments'] = module.params.get("default_arguments")
if module.params.get("description") is not None:
params['Description'] = module.params.get("description")
if module.params.get("glue_version") is not None:
params['GlueVersion'] = module.params.get("glue_version")
if module.params.get("max_concurrent_runs") is not None:
params['ExecutionProperty'] = {'MaxConcurrentRuns': module.params.get("max_concurrent_runs")}
if module.params.get("max_retries") is not None:
Expand All @@ -320,22 +404,24 @@ def create_or_update_glue_job(connection, module, glue_job):
# Update job needs slightly modified params
update_params = {'JobName': params['Name'], 'JobUpdate': copy.deepcopy(params)}
del update_params['JobUpdate']['Name']
connection.update_job(**update_params)
if not module.check_mode:
connection.update_job(aws_retry=True, **update_params)
changed = True
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
module.fail_json_aws(e)
else:
try:
connection.create_job(**params)
if not module.check_mode:
connection.create_job(aws_retry=True, **params)
changed = True
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
module.fail_json_aws(e)

# If changed, get the Glue job again
if changed:
glue_job = _get_glue_job(connection, module, params['Name'])
glue_job = _get_glue_job(connection, module, params['Name'])

changed |= ensure_tags(connection, module, glue_job)

module.exit_json(changed=changed, **camel_dict_to_snake_dict(glue_job))
module.exit_json(changed=changed, **camel_dict_to_snake_dict(glue_job or {}, ignore_list=['DefaultArguments']))


def delete_glue_job(connection, module, glue_job):
Expand All @@ -347,12 +433,12 @@ def delete_glue_job(connection, module, glue_job):
:param glue_job: a dict of AWS Glue job parameters or None
:return:
"""

changed = False

if glue_job:
try:
connection.delete_job(JobName=glue_job['Name'])
if not module.check_mode:
connection.delete_job(aws_retry=True, JobName=glue_job['Name'])
changed = True
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
module.fail_json_aws(e)
Expand All @@ -366,29 +452,34 @@ def main():
dict(
allocated_capacity=dict(type='int'),
command_name=dict(type='str', default='glueetl'),
command_python_version=dict(type='str'),
command_script_location=dict(type='str'),
connections=dict(type='list', elements='str'),
default_arguments=dict(type='dict'),
description=dict(type='str'),
glue_version=dict(type='str'),
max_concurrent_runs=dict(type='int'),
max_retries=dict(type='int'),
name=dict(required=True, type='str'),
number_of_workers=dict(type='int'),
purge_tags=dict(type='bool', default=True),
role=dict(type='str'),
state=dict(required=True, choices=['present', 'absent'], type='str'),
tags=dict(type='dict'),
timeout=dict(type='int'),
glue_version=dict(type='str'),
worker_type=dict(choices=['Standard', 'G.1X', 'G.2X'], type='str'),
number_of_workers=dict(type='int'),
)
)

module = AnsibleAWSModule(argument_spec=argument_spec,
required_if=[
('state', 'present', ['role', 'command_script_location'])
]
],
supports_check_mode=True
)

connection = module.client('glue')
retry_decorator = AWSRetry.jittered_backoff(retries=10)
connection = module.client('glue', retry_decorator=retry_decorator)

state = module.params.get("state")

Expand Down

0 comments on commit 1f93316

Please sign in to comment.