-
Notifications
You must be signed in to change notification settings - Fork 14
/
lambda_traffic_mirror.py
363 lines (330 loc) · 14.1 KB
/
lambda_traffic_mirror.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
#!/usr/bin/python3
# COPYRIGHT 2021 BY EXTRAHOP NETWORKS, INC.
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
import json
import boto3
import os
from netaddr import IPNetwork, IPAddress
EC2_CLIENT = boto3.client("ec2")
# The ID of the traffic mirror filter.
FILTER_ID = os.environ["filter_id"]
# The IDs of the traffic mirror targets for your ExtraHop sensors.
TARGETS = ["tmt-abcdefg0123456789"]
# Determines whether the function will mirror traffic across availabilty
# zones. If set to True, and there are no traffic mirror targets in the
# availability zone of the source EC2 instance, the function does not create
# a mirror session.
LOCAL_ZONE_ONLY = True
def create_mirror(interface_id, targets):
"""
Method that creates a traffic mirror session for a network interface (ENI).
The method selects the best traffic mirror target by looking for a target
within the current availability zone with the lowest mirror sessions. Targets
that do not allow traffic on required ports and protocols are not selected.
Parameters:
interface_id (str): The ID of the ENI
targets (list): A list of traffic mirror target IDs
Returns:
name (str): The name of the newly created traffic mirror session
"""
targets = find_local_targets(interface_id, targets)
if len(targets) == 0:
return {
"error": "None of the specified traffic mirror targets are in the same availability zone as "
+ interface_id
}
targets = find_allowed_targets(interface_id, targets, ["udp"], [4789])
if len(targets) == 0:
return {
"error": "None of the traffic mirror targets support UDP traffic on port 4789"
}
target_id = find_target_with_lowest(targets)
# Create a name by combining network interface ID and target ID. For example:
# interface_id = eni-055061bf5573d0a3b
# target_id = tmt-0bce8038d06d6c745
name = "eh-mirror-" + interface_id[4:8] + target_id[4:8]
tags = [
{
"ResourceType": "traffic-mirror-session",
"Tags": [{"Key": "Name", "Value": name}],
}
]
response = EC2_CLIENT.create_traffic_mirror_session(
NetworkInterfaceId=interface_id,
TrafficMirrorTargetId=target_id,
TrafficMirrorFilterId=FILTER_ID,
SessionNumber=1,
TagSpecifications=tags,
)
return {"name": name}
def find_local_targets(interface_id, targets):
"""
Method that returns a list of mirror targets that are in the same availability
zone as the specified network interface. If none of the specified mirror targets
are in the availability zone and LOCAL_ZONE_ONLY is false, the method returns
the original list; if LOCAL_ZONE_ONLY is true, the method returns an empty list.
Parameters:
interface_id (str): The ID of the ENI
targets (list): A list of traffic mirror target IDs
Returns:
local (list): A list of local traffic mirror target IDs
targets (list): The original list of traffic mirror target IDs
"""
source_zone = EC2_CLIENT.describe_network_interfaces(
NetworkInterfaceIds=[interface_id]
)["NetworkInterfaces"][0]["AvailabilityZone"]
local = []
# Create a list of all targets that are in the same
# availability zone as the interface
for target in targets:
eni = EC2_CLIENT.describe_traffic_mirror_targets(
Filters=[{"Name": "traffic-mirror-target-id", "Values": [target]}]
)["TrafficMirrorTargets"][0]["NetworkInterfaceId"]
target_zone = EC2_CLIENT.describe_network_interfaces(
NetworkInterfaceIds=[eni]
)["NetworkInterfaces"][0]["AvailabilityZone"]
if target_zone == source_zone:
local.append(target)
if local:
return local
# If there are no targets in the same zone, and LOCAL_ZONE_ONLY is true,
# return an empty list
elif LOCAL_ZONE_ONLY:
return []
# If there are no targets in the same zone, but LOCAL_ZONE_ONLY is false,
# return the original list
else:
return targets
def find_allowed_targets(interface_id, targets, protocols, port_range):
"""
Method that returns a list of mirror targets that can accept traffic from the
specified ENI on the specified port range over the specified protocols.
The method checks both the security group of the EC2 instance and the ACLs
of the subnet that the EC2 instance is on.
Parameters:
interface_id (str): The ID of the ENI
targets (list): A list of traffic mirror target IDs
Returns:
str: The ID of the mirror target with the least mirror sessions
"""
source_interface = EC2_CLIENT.describe_network_interfaces(
NetworkInterfaceIds=[interface_id]
)["NetworkInterfaces"][0]
# Find all IP addresses for the interface
source_ips = []
for addr in source_interface["PrivateIpAddresses"]:
if addr["PrivateIpAddress"]:
source_ips.append(addr["PrivateIpAddress"])
if addr["Association"]["PublicIp"]:
source_ips.append(addr["Association"]["PublicIp"])
eligible_targets = []
for target in targets:
target_interface_id = EC2_CLIENT.describe_traffic_mirror_targets(
TrafficMirrorTargetIds=[target]
)["TrafficMirrorTargets"][0]["NetworkInterfaceId"]
target_interface = EC2_CLIENT.describe_network_interfaces(
NetworkInterfaceIds=[target_interface_id]
)["NetworkInterfaces"][0]
# Check if source and target are on the same VPC. If so, don't check the ACL
same_vpc = False
source_vpc_id = source_interface["VpcId"]
target_vpc_id = target_interface["VpcId"]
if source_vpc_id == target_vpc_id:
same_vpc = True
if not same_vpc:
if not aclAllow(
target_interface,
port_range,
source_ips,
protocols,
target_vpc_id,
):
continue
interface_groups = target_interface["Groups"]
if groupsAllow(source_ips, port_range, interface_groups, protocols):
eligible_targets.append(target)
return eligible_targets
def aclAllow(
target_interface, required_range, source_ips, protocols, target_vpc_id
):
"""
Method that indicates whether at least one IP address in a list can send
traffic through the ACL of the specified target interface over the given protocols
on at least one port in the required range.
Parameters:
target_interface (obj): The target interface
required_range (range or list): The range of ports
source_ips (list): The list of IP addresses
protocols (list): The list of protocols
Returns:
bool: Whether the ACLs allow the communication
"""
confirmed_protocols = []
subnet_id = target_interface["SubnetId"]
acls = EC2_CLIENT.describe_network_acls(
Filters=[{"Name": "vpc-id", "Values": [target_vpc_id]}]
)["NetworkAcls"]
acl = findSubnetAcl(subnet_id, acls)
if acl == None:
return confirmed_protocols
for entry in acl["Entries"]:
# Skip outbound rules
if entry["Egress"] == True:
continue
proto = entry["Protocol"]
if proto == "-1" or proto in protocols:
# Skip rules that do not apply to the required port range
if "PortRange" in entry:
rule_range = range(
entry["PortRange"]["From"], permission["PortRange"]["To"]
)
if len(rule_range) == 0:
rule_range = [entry["PortRange"]["From"]]
port_matches = False
for port in rule_range:
if port in required_range:
port_matches = True
break
if port_matches == False:
continue
for source_ip in source_ips:
if "CidrBlock" in entry:
block = entry["CidrBlock"]
else:
block = entry["Ipv6CidrBlock"]
if IPAddress(source_ip) in IPNetwork(block):
if entry["RuleAction"] == "allow":
if proto == "-1":
return True
else:
confirmed_protocols.append(proto)
if set(confirmed_protocols) == set(protocols):
return True
# If there is a rule denying this particular IP
# move on to the next IP address, since any allow rules
# after this will be ignored
if entry["RuleAction"] == "deny":
continue
return False
def groupsAllow(source_ips, required_range, interface_groups, protocols):
"""
Method that evaluates EC2 instance security groups to determine whether at
least one IP address in a list can send traffic to the instance over the
given protocols on at least one port in the required range.
Parameters:
source_ips (list): The list of IP addresses
required_range (range or list): The range of ports
interface_groups (list): The specified security groups
protocols (list): The list of protocols
Returns:
bool: Whether the security groups allow the communication
"""
confirmed_protocols = []
for int_group in interface_groups:
sec_groups = EC2_CLIENT.describe_security_groups(
GroupIds=[int_group["GroupId"]]
)["SecurityGroups"]
for sec_group in sec_groups:
for permission in sec_group["IpPermissions"]:
proto = permission["IpProtocol"]
# Skip non-UDP/TCP rules
if proto == -1 or proto in protocols:
# Skip rules that do not apply to the required port range
rule_range = range(
permission["FromPort"], permission["ToPort"]
)
if len(rule_range) == 0:
rule_range = [permission["FromPort"]]
port_matches = False
for port in rule_range:
if port in required_range:
port_matches = True
break
if port_matches == False:
continue
for ip_range in permission["IpRanges"]:
for source_ip in source_ips:
if IPAddress(source_ip) in IPNetwork(
ip_range["CidrIp"]
):
# If the rule applies to all protocols, then do not
# check for other protocols
if proto == -1:
return True
else:
confirmed_protocols.append(proto)
if set(confirmed_protocols) == set(
protocols
):
return True
return False
def findSubnetAcl(subnet_id, acls):
"""
Method that returns the acl assigned to the specified subnet
Parameters:
subnet_id (str): The ID of the subnet
acls (obj): The ACLs of a VPC
Returns:
acl (obj): The ACL assigned to the subnet
"""
for acl in acls:
for association in acl["Associations"]:
if association["SubnetId"] == subnet_id:
return acl
return None
def find_target_with_lowest(targets):
"""
Method that searches a list of mirror targets for the target with the least
mirror sessions.
Parameters:
targets (list): A list of traffic mirror target IDs
Returns:
str: The ID of the mirror target with the least mirror sessions
"""
t_map = {}
for target in targets:
sessions = EC2_CLIENT.describe_traffic_mirror_sessions(
Filters=[{"Name": "traffic-mirror-target-id", "Values": [target]}]
)["TrafficMirrorSessions"]
t_map[target] = len(sessions)
return min(t_map, key=t_map.get)
def lambda_handler(event, context):
newId = event["detail"]["instance-id"]
response = EC2_CLIENT.describe_instances(InstanceIds=[newId])
for reservation in response["Reservations"]:
for instance in reservation["Instances"]:
for interface in instance["NetworkInterfaces"]:
interface_id = interface["NetworkInterfaceId"]
# Only create the mirror session if no mirror
# session exists for this instance
sessions = EC2_CLIENT.describe_traffic_mirror_sessions(
Filters=[
{
"Name": "network-interface-id",
"Values": [interface_id],
}
]
)["TrafficMirrorSessions"]
if sessions:
return {
"statusCode": 200,
"body": json.dumps(
"Mirror session already exists for this instance"
),
}
else:
result = create_mirror(interface_id, TARGETS)
if "error" in result:
return {
"statusCode": 400,
"body": json.dumps("Error: " + result["error"]),
}
else:
return {
"statusCode": 200,
"body": json.dumps(
"Created traffic mirror: " + result["name"]
),
}