from __future__ import absolute_import, division, print_function, unicode_literals
import jmespath
import six
class CloudWatchEvents(object):
"""A mapping of events to resource types."""
# **These are just shortcuts**, you can use the policy definition to
# subscribe to any arbitrary cloud trail event that corresponds to
# a custodian resource.
# For common events that we want to match, just keep a short mapping.
# Users can specify arbitrary cloud watch events by specifying these
# values in their config, but keep the common case simple.
trail_events = {
# event source, resource type as keys, mapping to api call and
# jmespath expression
'ConsoleLogin': {
'ids': 'userIdentity.arn',
'source': ''},
'CreateAutoScalingGroup': {
'ids': 'requestParameters.autoScalingGroupName',
'source': ''},
'UpdateAutoScalingGroup': {
'ids': 'requestParameters.autoScalingGroupName',
'source': ''},
'CreateBucket': {
'ids': 'requestParameters.bucketName',
'source': ''},
'CreateCluster': {
'ids': 'requestParameters.clusterIdentifier',
'source': ''},
'CreateLoadBalancer': {
'ids': 'requestParameters.loadBalancerName',
'source': ''},
'CreateLoadBalancerPolicy': {
'ids': 'requestParameters.loadBalancerName',
'source': ''},
'CreateDBInstance': {
'ids': 'requestParameters.dBInstanceIdentifier',
'source': ''},
'CreateVolume': {
'ids': 'responseElements.volumeId',
'source': ''},
'SetLoadBalancerPoliciesOfListener': {
'ids': 'requestParameters.loadBalancerName',
'source': ''},
'CreateElasticsearchDomain': {
'ids': 'requestParameters.domainName',
'source': ''},
'CreateTable': {
'ids': 'requestParameters.tableName',
'source': ''},
'RunInstances': {
'ids': 'responseElements.instancesSet.items[].instanceId',
'source': ''}}
def get(cls, event_name):
return cls.trail_events.get(event_name)
def match(cls, event):
"""Match a given cwe event as cloudtrail with an api call
That has its information filled out.
if 'detail' not in event:
return False
if 'eventName' not in event['detail']:
return False
k = event['detail']['eventName']
# We want callers to use a compiled expression, but want to avoid
# initialization cost of doing it without cause. Not thread safe,
# but usage context is lambda entry.
if k in cls.trail_events:
v = dict(cls.trail_events[k])
if isinstance(v['ids'], six.string_types):
v['ids'] = e = jmespath.compile('detail.%s' % v['ids'])
cls.trail_events[k]['ids'] = e
return v
return False
def get_trail_ids(cls, event, mode):
"""extract resources ids from a cloud trail event."""
resource_ids = ()
event_name = event['detail']['eventName']
event_source = event['detail']['eventSource']
for e in mode.get('events', []):
if not isinstance(e, dict):
# Check if we have a short cut / alias
info = CloudWatchEvents.match(event)
if info:
return info['ids'].search(event)
if event_name != e.get('event'):
if event_source != e.get('source'):
id_query = e.get('ids')
if not id_query:
raise ValueError("No id query configured")
evt = event
# be forgiving for users specifying with details or without
if not id_query.startswith('detail.'):
evt = event.get('detail', {})
resource_ids =, evt)
if resource_ids:
return resource_ids
def get_ids(cls, event, mode):
mode_type = mode.get('type')
if mode_type == 'ec2-instance-state':
resource_ids = [event.get('detail', {}).get('instance-id')]
elif mode_type == 'asg-instance-state':
resource_ids = [event.get('detail', {}).get('AutoScalingGroupName')]
elif mode_type != 'cloudtrail':
return None
resource_ids = cls.get_trail_ids(event, mode)
if not isinstance(resource_ids, (tuple, list)):
resource_ids = [resource_ids]
return list(filter(None, resource_ids))