-
Notifications
You must be signed in to change notification settings - Fork 368
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(data-classes): support for code pipeline job event (#416)
- Loading branch information
Michael Brewer
committed
May 17, 2021
1 parent
93fad02
commit 22754d3
Showing
6 changed files
with
475 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
236 changes: 236 additions & 0 deletions
236
aws_lambda_powertools/utilities/data_classes/code_pipeline_job_event.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,236 @@ | ||
import json | ||
import tempfile | ||
import zipfile | ||
from typing import Any, Dict, List, Optional | ||
from urllib.parse import unquote_plus | ||
|
||
import boto3 | ||
|
||
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper | ||
|
||
|
||
class CodePipelineConfiguration(DictWrapper): | ||
@property | ||
def function_name(self) -> str: | ||
"""Function name""" | ||
return self["FunctionName"] | ||
|
||
@property | ||
def user_parameters(self) -> str: | ||
"""User parameters""" | ||
return self["UserParameters"] | ||
|
||
@property | ||
def decoded_user_parameters(self) -> Dict[str, Any]: | ||
"""Json Decoded user parameters""" | ||
return json.loads(self.user_parameters) | ||
|
||
|
||
class CodePipelineActionConfiguration(DictWrapper): | ||
"""CodePipeline Action Configuration""" | ||
|
||
@property | ||
def configuration(self) -> CodePipelineConfiguration: | ||
return CodePipelineConfiguration(self["configuration"]) | ||
|
||
|
||
class CodePipelineS3Location(DictWrapper): | ||
@property | ||
def bucket_name(self) -> str: | ||
return self["bucketName"] | ||
|
||
@property | ||
def key(self) -> str: | ||
"""Raw S3 object key""" | ||
return self["objectKey"] | ||
|
||
@property | ||
def object_key(self) -> str: | ||
"""Unquote plus of the S3 object key""" | ||
return unquote_plus(self["objectKey"]) | ||
|
||
|
||
class CodePipelineLocation(DictWrapper): | ||
@property | ||
def get_type(self) -> str: | ||
"""Location type eg: S3""" | ||
return self["type"] | ||
|
||
@property | ||
def s3_location(self) -> CodePipelineS3Location: | ||
"""S3 location""" | ||
return CodePipelineS3Location(self["s3Location"]) | ||
|
||
|
||
class CodePipelineArtifact(DictWrapper): | ||
@property | ||
def name(self) -> str: | ||
"""Name""" | ||
return self["name"] | ||
|
||
@property | ||
def revision(self) -> Optional[str]: | ||
return self.get("revision") | ||
|
||
@property | ||
def location(self) -> CodePipelineLocation: | ||
return CodePipelineLocation(self["location"]) | ||
|
||
|
||
class CodePipelineArtifactCredentials(DictWrapper): | ||
@property | ||
def access_key_id(self) -> str: | ||
return self["accessKeyId"] | ||
|
||
@property | ||
def secret_access_key(self) -> str: | ||
return self["secretAccessKey"] | ||
|
||
@property | ||
def session_token(self) -> str: | ||
return self["sessionToken"] | ||
|
||
@property | ||
def expiration_time(self) -> Optional[int]: | ||
return self.get("expirationTime") | ||
|
||
|
||
class CodePipelineData(DictWrapper): | ||
"""CodePipeline Job Data""" | ||
|
||
@property | ||
def action_configuration(self) -> CodePipelineActionConfiguration: | ||
"""CodePipeline action configuration""" | ||
return CodePipelineActionConfiguration(self["actionConfiguration"]) | ||
|
||
@property | ||
def input_artifacts(self) -> List[CodePipelineArtifact]: | ||
"""Represents a CodePipeline input artifact""" | ||
return [CodePipelineArtifact(item) for item in self["inputArtifacts"]] | ||
|
||
@property | ||
def output_artifacts(self) -> List[CodePipelineArtifact]: | ||
"""Represents a CodePipeline output artifact""" | ||
return [CodePipelineArtifact(item) for item in self["outputArtifacts"]] | ||
|
||
@property | ||
def artifact_credentials(self) -> CodePipelineArtifactCredentials: | ||
"""Represents a CodePipeline artifact credentials""" | ||
return CodePipelineArtifactCredentials(self["artifactCredentials"]) | ||
|
||
@property | ||
def continuation_token(self) -> Optional[str]: | ||
"""A continuation token if continuing job""" | ||
return self.get("continuationToken") | ||
|
||
|
||
class CodePipelineJobEvent(DictWrapper): | ||
"""AWS CodePipeline Job Event | ||
Documentation: | ||
------------- | ||
- https://docs.aws.amazon.com/codepipeline/latest/userguide/actions-invoke-lambda-function.html | ||
- https://docs.aws.amazon.com/lambda/latest/dg/services-codepipeline.html | ||
""" | ||
|
||
def __init__(self, data: Dict[str, Any]): | ||
super().__init__(data) | ||
self._job = self["CodePipeline.job"] | ||
|
||
@property | ||
def get_id(self) -> str: | ||
"""Job id""" | ||
return self._job["id"] | ||
|
||
@property | ||
def account_id(self) -> str: | ||
"""Account id""" | ||
return self._job["accountId"] | ||
|
||
@property | ||
def data(self) -> CodePipelineData: | ||
"""Code pipeline jab data""" | ||
return CodePipelineData(self._job["data"]) | ||
|
||
@property | ||
def user_parameters(self) -> str: | ||
"""Action configuration user parameters""" | ||
return self.data.action_configuration.configuration.user_parameters | ||
|
||
@property | ||
def decoded_user_parameters(self) -> Dict[str, Any]: | ||
"""Json Decoded action configuration user parameters""" | ||
return self.data.action_configuration.configuration.decoded_user_parameters | ||
|
||
@property | ||
def input_bucket_name(self) -> str: | ||
"""Get the first input artifact bucket name""" | ||
return self.data.input_artifacts[0].location.s3_location.bucket_name | ||
|
||
@property | ||
def input_object_key(self) -> str: | ||
"""Get the first input artifact order key unquote plus""" | ||
return self.data.input_artifacts[0].location.s3_location.object_key | ||
|
||
def setup_s3_client(self): | ||
"""Creates an S3 client | ||
Uses the credentials passed in the event by CodePipeline. These | ||
credentials can be used to access the artifact bucket. | ||
Returns | ||
------- | ||
BaseClient | ||
An S3 client with the appropriate credentials | ||
""" | ||
return boto3.client( | ||
"s3", | ||
aws_access_key_id=self.data.artifact_credentials.access_key_id, | ||
aws_secret_access_key=self.data.artifact_credentials.secret_access_key, | ||
aws_session_token=self.data.artifact_credentials.session_token, | ||
) | ||
|
||
def find_input_artifact(self, artifact_name: str) -> Optional[CodePipelineArtifact]: | ||
"""Find an input artifact by artifact name | ||
Parameters | ||
---------- | ||
artifact_name : str | ||
The name of the input artifact to look for | ||
Returns | ||
------- | ||
CodePipelineArtifact, None | ||
Matching CodePipelineArtifact if found | ||
""" | ||
for artifact in self.data.input_artifacts: | ||
if artifact.name == artifact_name: | ||
return artifact | ||
return None | ||
|
||
def get_artifact(self, artifact_name: str, filename: str) -> Optional[str]: | ||
"""Get a file within an artifact zip on s3 | ||
Parameters | ||
---------- | ||
artifact_name : str | ||
Name of the S3 artifact to download | ||
filename : str | ||
The file name within the artifact zip to extract as a string | ||
Returns | ||
------- | ||
str, None | ||
Returns the contents file contents as a string | ||
""" | ||
artifact = self.find_input_artifact(artifact_name) | ||
if artifact is None: | ||
return None | ||
|
||
with tempfile.NamedTemporaryFile() as tmp_file: | ||
s3 = self.setup_s3_client() | ||
bucket = artifact.location.s3_location.bucket_name | ||
key = artifact.location.s3_location.key | ||
s3.download_file(bucket, key, tmp_file.name) | ||
with zipfile.ZipFile(tmp_file.name, "r") as zip_file: | ||
return zip_file.read(filename).decode("UTF-8") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
{ | ||
"CodePipeline.job": { | ||
"id": "11111111-abcd-1111-abcd-111111abcdef", | ||
"accountId": "111111111111", | ||
"data": { | ||
"actionConfiguration": { | ||
"configuration": { | ||
"FunctionName": "MyLambdaFunctionForAWSCodePipeline", | ||
"UserParameters": "some-input-such-as-a-URL" | ||
} | ||
}, | ||
"inputArtifacts": [ | ||
{ | ||
"name": "ArtifactName", | ||
"revision": null, | ||
"location": { | ||
"type": "S3", | ||
"s3Location": { | ||
"bucketName": "the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890", | ||
"objectKey": "the name of the application, for example CodePipelineDemoApplication.zip" | ||
} | ||
} | ||
} | ||
], | ||
"outputArtifacts": [], | ||
"artifactCredentials": { | ||
"accessKeyId": "AKIAIOSFODNN7EXAMPLE", | ||
"secretAccessKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", | ||
"sessionToken": "MIICiTCCAfICCQD6m7oRw0uXOjANBgkqhkiG9w0BAQUFADCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wHhcNMTEwNDI1MjA0NTIxWhcNMTIwNDI0MjA0NTIxWjCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMaK0dn+a4GmWIWJ21uUSfwfEvySWtC2XADZ4nB+BLYgVIk60CpiwsZ3G93vUEIO3IyNoH/f0wYK8m9TrDHudUZg3qX4waLG5M43q7Wgc/MbQITxOUSQv7c7ugFFDzQGBzZswY6786m86gpEIbb3OhjZnzcvQAaRHhdlQWIMm2nrAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAtCu4nUhVVxYUntneD9+h8Mg9q6q+auNKyExzyLwaxlAoo7TJHidbtS4J5iNmZgXL0FkbFFBjvSfpJIlJ00zbhNYS5f6GuoEDmFJl0ZxBHjJnyp378OD8uTs7fLvjx79LjSTbNYiytVbZPQUQ5Yaxu2jXnimvw3rrszlaEXAMPLE=" | ||
}, | ||
"continuationToken": "A continuation token if continuing job" | ||
} | ||
} | ||
} |
Oops, something went wrong.