-
Notifications
You must be signed in to change notification settings - Fork 0
/
trigger-based-step6-lambda-aws-config-compliance.py
62 lines (52 loc) · 2.52 KB
/
trigger-based-step6-lambda-aws-config-compliance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import json
import boto3
from datetime import datetime
def lambda_handler(event, context):
# Parse the invoking event string as JSON
invoking_event = json.loads(event['invokingEvent'])
try:
# Initialize AWS Config client
config = boto3.client('config')
# Get properties from the event
group_name = invoking_event['configurationItem']['resourceId']
# Get configuration history for the Azure Security Group
response = config.get_resource_config_history(
resourceType='AzureTest::VM::SecurityGroup',
resourceId=group_name
)
# Get the most recent configuration
current_config = json.loads(response['configurationItems'][0]['configuration'])
# Check the security group's rules
for rule in current_config['SecurityRules']:
if (rule['SecurityRuleDestinationPortRange'] == '22' and (rule['SecurityRuleSourceAddressPrefix'] == '*' or rule['SecurityRuleSourceAddressPrefix'] == '0.0.0.0/0') and
rule['SecurityRuleAccess'] == 'Allow' and rule['SecurityRuleDirection'] == 'Inbound'):
# If the rule matches, the resource is not compliant
config.put_evaluations(
Evaluations=[
{
'ComplianceResourceType': invoking_event['configurationItem']['resourceType'],
'ComplianceResourceId': group_name,
'ComplianceType': 'NON_COMPLIANT',
'Annotation': 'The security group contains a rule that allows ingress from * (any) to port 22.',
'OrderingTimestamp': datetime.now()
}
],
ResultToken=event['resultToken']
)
return
# If no matching rule was found, the resource is compliant
config.put_evaluations(
Evaluations=[
{
'ComplianceResourceType': invoking_event['configurationItem']['resourceType'],
'ComplianceResourceId': group_name,
'ComplianceType': 'COMPLIANT',
'Annotation': 'The security group does not contain any rules that allow ingress from * (any) to port 22.',
'OrderingTimestamp': datetime.now()
}
],
ResultToken=event['resultToken']
)
except Exception as e:
print(f"An unexpected error occurred: {e}")
raise e