-
Notifications
You must be signed in to change notification settings - Fork 8
/
ef_aws_resolver.py
1199 lines (1105 loc) · 45.6 KB
/
ef_aws_resolver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Copyright 2016-2017 Ellation, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import datetime
import json
import re
from botocore.exceptions import ClientError
from netaddr import IPNetwork
import requests
import ef_utils
class EFAwsResolver(object):
"""
For keys to look up, we use partial ARN syntax to identify system and information sought:
http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#arn-syntax-ec2
Not all possible lookups are supported
Expects these clients to be pre-made and passed in:
cloudfront, cloudformation, ec2, iam, kms, lambda, route53, waf
Example:
the ARN of a security group ID is:
arn:aws:ec2:us-west-2:1234567890:security-group/security-group-id
following this pattern:
arn:aws:<service>:<region>:<account-id>:<key>
we trim it to:
<service>:<key>
ec2:security-group/security-group-id
which becomes a template token with 'aws:' prepended to identify the lookup provider,
and the search string appended:
{{aws:ec2:security-group/security-group-id,proto2-ess-elb}}
which then is passed to the lookup function here as:
EFAwsResolver.lookup("ec2:security-group/security-group-id","proto2-ess-elb")
"""
# dictionary of boto3 clients: {"ec2":ec2_client, ... } made with ef_utils.create_aws_clients
__CLIENTS = {}
def _elbv2_load_balancer(self, lookup):
"""
Args:
lookup: the friendly name of the V2 elb to look up
Returns:
A dict with the load balancer description
Raises:
botocore.exceptions.ClientError: no such load-balancer
"""
client = EFAwsResolver.__CLIENTS['elbv2']
elbs = client.describe_load_balancers(Names=[lookup])
# getting the first one, since we requested only one lb
elb = elbs['LoadBalancers'][0]
return elb
def acm_certificate_arn(self, lookup, default=None):
"""
Args:
lookup: region/domain on the certificate to be looked up
default: the optional value to return if lookup failed; returns None if not set
Returns:
ARN of a certificate with status "Issued" for the region/domain, if found, or default/None if no match
If more than one "Issued" certificate matches the region/domain:
- if any matching cert was issued by Amazon, returns ARN of certificate with most recent IssuedAt timestamp
- if no certs were issued by Amazon, returns ARN of an arbitrary matching certificate
- certificates issued by Amazon take precedence over certificates not issued by Amazon
"""
# @todo: Only searches the first 100 certificates in the account
try:
# This a region-specific client, so we'll make a new client in the right place using existing SESSION
region_name, domain_name = lookup.split("/")
acm_client = EFAwsResolver.__CLIENTS["SESSION"].client(service_name="acm", region_name=region_name)
response = acm_client.list_certificates(
CertificateStatuses=['ISSUED'],
MaxItems=100
)
except Exception:
return default
# No certificates
if len(response["CertificateSummaryList"]) < 1:
return default
# One or more certificates - find cert with latest IssuedAt date or an arbitrary cert if none are dated
best_match_cert = None
for cert_handle in response["CertificateSummaryList"]:
if cert_handle["DomainName"] == domain_name:
cert = acm_client.describe_certificate(CertificateArn=cert_handle["CertificateArn"])["Certificate"]
# Patch up cert if there is no IssuedAt (i.e. cert was not issued by Amazon)
if "IssuedAt" not in cert:
cert[u"IssuedAt"] = datetime.datetime(1970, 1, 1, 0, 0)
if best_match_cert is None:
best_match_cert = cert
elif cert["IssuedAt"] > best_match_cert["IssuedAt"]:
best_match_cert = cert
if best_match_cert is not None:
return best_match_cert["CertificateArn"]
return default
def ec2_elasticip_elasticip_id(self, lookup, default=None):
"""
Args:
lookup: the CloudFormation resource name of the Elastic IP ID to look up
default: the optional value to return if lookup failed; returns None if not set
Returns:
The ID of the first Elastic IP found with a description matching 'lookup' or default/None if no match found
"""
public_ip = self.ec2_elasticip_elasticip_ipaddress(lookup)
if public_ip is None:
return default
try:
eips = EFAwsResolver.__CLIENTS["ec2"].describe_addresses(
PublicIps=[public_ip]
)
# Public IP not found
except ClientError:
return default
eip_id = eips["Addresses"][0]["AllocationId"]
return eip_id
def ec2_elasticip_elasticip_ipaddress(self, lookup, default=None):
"""
Args:
lookup: the CloudFormation resource name of the Elastic IP address to look up
default: the optional value to return if lookup failed; returns None if not set
Returns:
The IP address of the first Elastic IP found with a description matching 'lookup' or default/None if no match
"""
# Extract environment from resource ID to build stack name
m = re.search('ElasticIp([A-Z]?[a-z]+[0-9]?)\w+', lookup)
# The lookup string was not a valid ElasticIp resource label
if m is None:
return default
env = m.group(1)
stackname = "{}-elasticip".format(env.lower())
# Convert env substring to title in case {{ENV}} substitution is being used
lookup = lookup.replace(env, env.title())
# Look up the EIP resource in the stack to get the IP address assigned to the EIP
try:
eip_stack = EFAwsResolver.__CLIENTS["cloudformation"].describe_stack_resources(
StackName=stackname,
LogicalResourceId=lookup
)
except ClientError:
return default
stack_resources = eip_stack["StackResources"]
# Resource does not exist in stack
if len(stack_resources) < 1:
return default
eip_publicip = stack_resources[0]["PhysicalResourceId"]
return eip_publicip
def ec2_eni_eni_id(self, lookup, default=None):
"""
Args:
lookup: the description of the Elastic Network Interface (ENI) to look up
default: the optional value to return if lookup failed; returns None if not set
Returns:
The ID of the first ENI found with a description matching 'lookup' or default/None if no match found
"""
enis = EFAwsResolver.__CLIENTS["ec2"].describe_network_interfaces(Filters=[{
'Name': 'description',
'Values': [lookup]
}])
if len(enis.get("NetworkInterfaces")) > 0:
return enis["NetworkInterfaces"][0]["NetworkInterfaceId"]
else:
return default
def ec2_network_network_acl_id(self, lookup, default=None):
"""
Args:
lookup: the friendly name of the network ACL we are looking up
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ID of the network ACL, or None if no match found
"""
network_acl_id = EFAwsResolver.__CLIENTS["ec2"].describe_network_acls(Filters=[{
'Name': 'tag:Name',
'Values': [lookup]
}])
if len(network_acl_id["NetworkAcls"]) > 0:
return network_acl_id["NetworkAcls"][0]["NetworkAclId"]
else:
return default
def ec2_security_group_security_group_id(self, lookup, default=None):
"""
Args:
lookup: the friendly name of a security group to look up
default: the optional value to return if lookup failed; returns None if not set
Returns:
Security group ID if target found or default/None if no match
"""
try:
response = EFAwsResolver.__CLIENTS["ec2"].describe_security_groups(Filters=[{
'Name': 'group-name', 'Values': [lookup]
}])
if len(response["SecurityGroups"]) > 0:
return response["SecurityGroups"][0]["GroupId"]
except Exception as e:
print("Error occurred while searching by group name: {}".format(e))
try:
response = EFAwsResolver.__CLIENTS["ec2"].describe_security_groups(Filters=[{
'Name': 'tag:Name', 'Values': [lookup]
}])
if len(response["SecurityGroups"]) > 0:
return response["SecurityGroups"][0]["GroupId"]
except Exception as e:
print("Error occurred while searching by tag name: {}".format(e))
return default
def ec2_subnet_subnet_id(self, lookup, default=None):
"""
Return:
the ID of a single subnet or default/None if no match
Args:
lookup: the friendly name of the subnet to look up (subnet-<env>-a or subnet-<env>-b)
default: the optional value to return if lookup failed; returns None if not set
"""
subnets = EFAwsResolver.__CLIENTS["ec2"].describe_subnets(Filters=[{
'Name': 'tag:Name',
'Values': [lookup]
}])
if len(subnets["Subnets"]) > 0:
return subnets["Subnets"][0]["SubnetId"]
else:
return default
def ec2_subnet_subnet_cidr(self, lookup, default=None):
"""
Return:
the ID of a single subnet or default/None if no match
Args:
lookup: the friendly name of the subnet to look up (subnet-<env>-a or subnet-<env>-b)
default: the optional value to return if lookup failed; returns None if not set
"""
subnets = EFAwsResolver.__CLIENTS["ec2"].describe_subnets(Filters=[{
'Name': 'tag:Name',
'Values': [lookup]
}])
if len(subnets["Subnets"]) > 0:
return subnets["Subnets"][0]["CidrBlock"]
else:
return default
def ec2_vpc_availabilityzones(self, lookup, default=None):
"""
Args:
lookup: the friendly name of a VPC to look up
default: the optional value to return if lookup failed; returns None if not set
Returns:
A comma-separated list of availability zones in use in the named VPC or default/None if no match
"""
vpc_id = self.ec2_vpc_vpc_id(lookup)
if vpc_id is None:
return default
subnets = EFAwsResolver.__CLIENTS["ec2"].describe_subnets(Filters=[{
'Name': 'vpc-id',
'Values': [vpc_id]
}])
if len(subnets["Subnets"]) > 0:
# Strip the metadata section (subnets["Subnets"])
az_list = list(set([s["AvailabilityZone"] for s in subnets["Subnets"]]))
# Add internal ", " only. This is called literally from: "{{aws...}}" - CF template needs the outer quotes
return "\", \"".join(az_list)
else:
return default
def ec2_vpc_subnets(self, lookup, default=None):
"""
Args:
lookup - the friendly name of the VPC whose subnets we want
Returns:
A comma-separated list of all public subnets in use in the named VPC or default/None if no match found.
This will not return the list of private subnets.
"""
vpc_id = self.ec2_vpc_vpc_id(lookup)
if vpc_id is None:
return default
subnets = EFAwsResolver.__CLIENTS["ec2"].describe_subnets(Filters=[{
'Name': 'vpc-id',
'Values': [vpc_id]
}])
public_subnets = []
for subnet in subnets['Subnets']:
routes = EFAwsResolver.__CLIENTS["ec2"].describe_route_tables(Filters=[
{'Name': 'association.subnet-id',
'Values': [subnet["SubnetId"]]}])['RouteTables'][0]['Routes']
is_public = any(route['GatewayId'].startswith('igw-') for route in routes if 'GatewayId' in route)
if is_public:
public_subnets.append(subnet)
if len(public_subnets) > 0:
# Strip the metadata section (subnets["Subnets"])
subnet_list = [s["SubnetId"] for s in public_subnets]
# Add internal ", " only. This is called literally from: "{{aws...}}" - reuses the outer quotes
return "\", \"".join(subnet_list)
else:
return default
def ec2_vpc_cidrblock(self, lookup, default=None):
"""
Args:
lookup - the friendly name of the VPC whose CIDR block we want
Returns:
The CIDR block of the named VPC, or default/None if no match found
"""
vpcs = EFAwsResolver.__CLIENTS["ec2"].describe_vpcs(Filters=[{
'Name': 'tag:Name',
'Values': [lookup]
}])
if len(vpcs.get("Vpcs")) > 0:
return vpcs["Vpcs"][0]["CidrBlock"]
else:
return default
def ec2_vpc_vpc_id(self, lookup, default=None):
"""
Args:
lookup: the friendly name of the VPC to look up
default: the optional value to return if lookup failed; returns None if not set
Returns:
The ID of the first VPC found with a label matching 'lookup' or default/None if no match found
"""
vpcs = EFAwsResolver.__CLIENTS["ec2"].describe_vpcs(Filters=[{
'Name': 'tag:Name',
'Values': [lookup]
}])
if len(vpcs.get("Vpcs")) > 0:
return vpcs["Vpcs"][0]["VpcId"]
else:
return default
def ec2_vpc_endpoint_id(self, lookup, default=None):
"""
Args:
lookup: the name of the VPC endpoint to look up (in tags)
default: the optional value to return if lookup failed; returns None if not set
Returns:
The ID of the VPC Endpoint found with a label matching 'lookup' or default/None if no match found
"""
vpc_endpoints = EFAwsResolver.__CLIENTS["ec2"].describe_vpc_endpoints(Filters=[{
"Name": "tag:Name",
"Values": [lookup]
}])
if len(vpc_endpoints.get("VpcEndpoints")) > 0:
return vpc_endpoints["VpcEndpoints"][0]["VpcEndpointId"]
else:
return default
def ec2_vpc_endpoint_id_by_vpc_service(self, lookup, default=None):
"""
Args:
lookup: a forward-slash-delimited string of [vpc-name, service-name] "vpc-name/service-name"
default: the optional value to return if lookup failed; returns None if not set
Returns:
The ID of the OLDEST VPC endpoint found in the given VPC for the given service
"""
vpc_name, service_name = lookup.split("/")
vpc_id = self.ec2_vpc_vpc_id(vpc_name)
if vpc_id is None:
return default
vpc_endpoints = EFAwsResolver.__CLIENTS["ec2"].describe_vpc_endpoints(Filters=[
{"Name": "vpc-id", "Values":[vpc_id]},
{"Name": "service-name", "Values":[service_name]}
])
if len(vpc_endpoints.get("VpcEndpoints")) < 1:
return default
oldest = None
for vpce in vpc_endpoints.get("VpcEndpoints"):
if oldest is None or vpce["CreationTimestamp"] < oldest["CreationTimestamp"]:
oldest = vpce
return oldest["VpcEndpointId"]
def ec2_vpc_endpoint_dns_name(self, lookup, default=None):
"""
Args:
lookup: the name of the VPC endpoint to look up (in tags)
default: the optional value to return if lookup failed; returns None if not set
Returns:
The first DNS Name of the VPC Endpoint found with a label matching 'lookup' or default/None if no match found
"""
vpc_endpoints = EFAwsResolver.__CLIENTS["ec2"].describe_vpc_endpoints(Filters=[{
"Name": "tag:Name",
"Values": [lookup]
}])
if len(vpc_endpoints.get("VpcEndpoints")) > 0:
if len(vpc_endpoints["VpcEndpoints"][0]["DnsEntries"]) < 1:
return default
return vpc_endpoints["VpcEndpoints"][0]["DnsEntries"][0]["DnsName"]
else:
return default
def ec2_vpc_endpoint_dns_name_by_vpc_service(self, lookup, default=None):
"""
Args:
lookup: a forward-slash-delimited string of [vpc-name, service-name] "vpc-name/service-name"
default: the optional value to return if lookup failed; returns None if not set
Returns:
The first DNS Name of the OLDEST VPC endpoint found in the given VPC for the given service
"""
vpc_name, service_name = lookup.split("/")
vpc_id = self.ec2_vpc_vpc_id(vpc_name)
if vpc_id is None:
return default
vpc_endpoints = EFAwsResolver.__CLIENTS["ec2"].describe_vpc_endpoints(Filters=[
{"Name": "vpc-id", "Values":[vpc_id]},
{"Name": "service-name", "Values":[service_name]}
])
if len(vpc_endpoints.get("VpcEndpoints")) < 1:
return default
oldest = None
for vpce in vpc_endpoints.get("VpcEndpoints"):
if oldest is None or vpce["CreationTimestamp"] < oldest["CreationTimestamp"]:
oldest = vpce
if len(oldest["DnsEntries"]) < 1:
return default
return oldest["DnsEntries"][0]["DnsName"]
def ec2_vpc_vpn_gateway_id(self, lookup, default=None):
"""
Args:
lookup: the friendly name of the VPN Gateway ID to look up
default: the optional value to return if lookup failed; returns None if not set
Returns:
The ID of the VPN Gateway found with a label matching 'lookup' or default/None if no match found
"""
vpn_gateways = EFAwsResolver.__CLIENTS["ec2"].describe_vpn_gateways(Filters=[{
"Name": "tag:Name",
"Values": [lookup]
}])
if len(vpn_gateways) > 0:
return vpn_gateways["VpnGateways"][0]["VpnGatewayId"]
else:
return default
def elbv2_load_balancer_hosted_zone(self, lookup, default=None):
"""
Args:
lookup: the friendly name of the V2 elb to look up
default: value to return in case of no match
Returns:
The hosted zone ID of the ELB found with a name matching 'lookup'.
"""
try:
elb = self._elbv2_load_balancer(lookup)
return elb['CanonicalHostedZoneId']
except ClientError:
return default
def elbv2_load_balancer_dns_name(self, lookup, default=None):
"""
Args:
lookup: the friendly name of the V2 elb to look up
default: value to return in case of no match
Returns:
The hosted zone ID of the ELB found with a name matching 'lookup'.
"""
try:
elb = self._elbv2_load_balancer(lookup)
return elb['DNSName']
except ClientError:
return default
def elbv2_load_balancer_arn_suffix(self, lookup, default=None):
"""
Args:
lookup: the friendly name of the v2 elb to look up
default: value to return in case of no match
Returns:
The shorthand fragment of the ALB's ARN, of the form `app/*/*`
"""
try:
elb = self._elbv2_load_balancer(lookup)
m = re.search(r'.+?(app\/[^\/]+\/[^\/]+)$', elb['LoadBalancerArn'])
return m.group(1)
except ClientError:
return default
def elbv2_target_group_arn(self, lookup, default=None):
"""
Args:
lookup: the friendly name of the v2 elb target group
default: value to return in case of no match
Returns:
The full ARN of the target group matching the lookup
"""
try:
client = EFAwsResolver.__CLIENTS['elbv2']
elbs = client.describe_target_groups(Names=[lookup])
elb = elbs['TargetGroups'][0]
return elb['TargetGroupArn']
except ClientError:
return default
def elbv2_target_group_arn_suffix(self, lookup, default=None):
"""
Args:
lookup: the friendly name of the v2 elb target group
default: value to return in case of no match
Returns:
The shorthand fragment of the target group's ARN, of the form
`targetgroup/*/*`
"""
try:
client = EFAwsResolver.__CLIENTS['elbv2']
elbs = client.describe_target_groups(Names=[lookup])
elb = elbs['TargetGroups'][0]
m = re.search(r'.+?(targetgroup\/[^\/]+\/[^\/]+)$', elb['TargetGroupArn'])
return m.group(1)
except ClientError:
return default
def waf_ip_set_id(self, lookup, default=None):
"""
Args:
lookup: the friendly name of an IP set
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ID of the IP set whose name matches 'lookup' or default/None if no match found
"""
# list_rules returns at most 100 rules per request
list_limit = 100
list_ip_sets = EFAwsResolver.__CLIENTS["waf"].list_ip_sets
ip_sets = list_ip_sets(Limit=list_limit)
while True:
for rule in ip_sets["IPSets"]:
if rule["Name"] == lookup:
return rule["IPSetId"]
if "NextMarker" in ip_sets:
ip_sets = list_ip_sets(Limit=list_limit, NextMarker=ip_sets["NextMarker"])
else:
return default
def waf_rule_id(self, lookup, default=None):
"""
Args:
lookup: the friendly name of a WAF rule
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ID of the WAF rule whose name matches 'lookup' or default/None if no match found
"""
# list_rules returns at most 100 rules per request
list_limit = 100
rules = EFAwsResolver.__CLIENTS["waf"].list_rules(Limit=list_limit)
while True:
for rule in rules["Rules"]:
if rule["Name"] == lookup:
return rule["RuleId"]
if "NextMarker" in rules:
rules = EFAwsResolver.__CLIENTS["waf"].list_rules(Limit=list_limit, NextMarker=rules["NextMarker"])
else:
return default
def waf_web_acl_id(self, lookup, default=None):
"""
Args:
lookup: the friendly name of a Web ACL
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ID of the WAF Web ACL whose name matches rule_name or default/None if no match found
"""
# list_rules returns at most 100 rules per request
list_limit = 100
acls = EFAwsResolver.__CLIENTS["waf"].list_web_acls(Limit=list_limit)
while True:
for acl in acls["WebACLs"]:
if acl["Name"] == lookup:
return acl["WebACLId"]
if "NextMarker" in acls:
acls = EFAwsResolver.__CLIENTS["waf"].list_web_acls(Limit=list_limit, NextMarker=acls["NextMarker"])
else:
return default
def get_cloudflare_cidrs(self):
"""
Args: None
Returns:
List of Cloudflare CIDR blocks for whitelisting in AWS WAF
"""
r = requests.get('https://cloudflare.com/ips-v4', timeout=10)
r.raise_for_status()
return r.text.split('\n')
def waf_cloudflare_ip_list(self):
"""
Args: None
Returns:
JSON array of WAFv1 IPSetDescriptors containing all Cloudflare CIDR IPs
Notes:
CIDRs in this array will be broken down to multiple /16's if original prefix is /1-/15
due to limitations with WAFv1
"""
cidr_list = []
for cidr in self.get_cloudflare_cidrs():
net = IPNetwork(cidr)
if net.size > 65536: # greater than /16
subnets = net.subnet(16)
for subnet_cidr in subnets:
cidr_list.append(str(subnet_cidr))
else:
cidr_list.append(str(cidr))
cidr_list.sort()
ip_set = map(lambda ip: {"Type": "IPV4", "Value": ip}, cidr_list)
return json.dumps(list(ip_set))
def wafv2_cloudflare_ip_list(self):
"""
Args: None
Returns:
JSON array of Cloudflare IPs in CIDR notation for whitelisting in WAFv2
"""
return json.dumps(list(self.get_cloudflare_cidrs()))
def wafv2_global_ip_set_arn(self, lookup, default=None):
"""
Args:
lookup: the friendly name of a Waf v2 IP set
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ARN of the WAFv2 Global (Cloudfront) IP set whose name matches 'lookup' or default/None if no match found
note that global ARNs are always in us-east-1, which is why the region is hardcoded below
"""
# list_rules returns at most 100 rules per request
list_limit = 100
list_ip_sets = EFAwsResolver.__CLIENTS["wafv2"].list_ip_sets
ip_sets = list_ip_sets(Limit=list_limit, Scope="CLOUDFRONT")
while True:
for set in ip_sets["IPSets"]:
if set["Name"] == lookup:
return "arn:aws:wafv2:us-east-1:{{{{ACCOUNT}}}}:global/ipset/{}/{}".format(lookup, set["Id"])
if "NextMarker" in ip_sets:
ip_sets = list_ip_sets(Limit=list_limit, Scope="CLOUDFRONT", NextMarker=ip_sets["NextMarker"])
else:
return default
def wafv2_global_rule_group_arn(self, lookup, default=None):
"""
Args:
lookup: the friendly name of a Waf v2 rule group
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ARN of the WAFv2 Global (Cloudfront) Rule Group whose name matches 'lookup' or default/None if no match found
note that global ARNs are always in us-east-1, which is why the region is hardcoded below
"""
# list_rules returns at most 100 rules per request
list_limit = 100
rule_groups = EFAwsResolver.__CLIENTS["wafv2"].list_rule_groups(Limit=list_limit, Scope="CLOUDFRONT")
while True:
for group in rule_groups["RuleGroups"]:
if group["Name"] == lookup:
return "arn:aws:wafv2:us-east-1:{{{{ACCOUNT}}}}:global/rulegroup/{}/{}".format(lookup, group["Id"])
if "NextMarker" in rule_groups:
rule_groups = EFAwsResolver.__CLIENTS["wafv2"].list_rule_groups(
Limit=list_limit, Scope="CLOUDFRONT", NextMarker=rule_groups["NextMarker"])
else:
return default
def wafv2_global_web_acl_arn(self, lookup, default=None):
"""
Args:
lookup: the friendly name of a Waf v2 ACL
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ARN of the WAFv2 Global (Cloudfront) Web ACL whose name matches rule_name or default/None if no match found
note that global ARNs are always in us-east-1, which is why the region is hardcoded below
"""
# list_rules returns at most 100 rules per request
list_limit = 100
acls = EFAwsResolver.__CLIENTS["wafv2"].list_web_acls(Limit=list_limit, Scope="CLOUDFRONT")
while True:
for acl in acls["WebACLs"]:
if acl["Name"] == lookup:
return "arn:aws:wafv2:us-east-1:{{{{ACCOUNT}}}}:global/webacl/{}/{}".format(lookup, acl["Id"])
if "NextMarker" in acls:
acls = EFAwsResolver.__CLIENTS["wafv2"].list_web_acls(
Limit=list_limit, Scope="CLOUDFRONT", NextMarker=acls["NextMarker"])
else:
return default
def route53_public_hosted_zone_id(self, lookup, default=None):
"""
Args:
lookup: The zone name to look up. Must end with "."
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ID of the public hosted zone for the 'lookup' domain, or default/None if no match found
"""
list_limit = "100"
# enforce terminal '.' in name, otherwise we could get a partial match of the incorrect zones
if lookup[-1] != '.':
return default
hosted_zones = EFAwsResolver.__CLIENTS["route53"].list_hosted_zones_by_name(DNSName=lookup, MaxItems=list_limit)
# Return if the account has no HostedZones
if "HostedZones" not in hosted_zones:
return default
while True:
for hosted_zone in hosted_zones["HostedZones"]:
if lookup == hosted_zone["Name"] and not hosted_zone["Config"]["PrivateZone"]:
return hosted_zone["Id"].split("/")[2]
if hosted_zones["IsTruncated"]:
hosted_zones = EFAwsResolver.__CLIENTS["route53"].list_hosted_zones_by_name(
DNSName=hosted_zones["NextDNSName"], HostedZoneId=hosted_zones["NextHostedZoneId"], MaxItems=list_limit)
else:
return default
def route53_private_hosted_zone_id(self, lookup, default=None):
"""
Args:
lookup: The zone name to look up. Must end with "."
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ID of the private hosted zone for the 'lookup' domain, or default/None if no match found
"""
list_limit = "100"
# enforce terminal '.' in name, otherwise we could get a partial match of the incorrect zones
if lookup[-1] != '.':
return default
hosted_zones = EFAwsResolver.__CLIENTS["route53"].list_hosted_zones_by_name(DNSName=lookup, MaxItems=list_limit)
# Return if the account has no HostedZones
if "HostedZones" not in hosted_zones:
return default
while True:
for hosted_zone in hosted_zones["HostedZones"]:
if lookup == hosted_zone["Name"] and hosted_zone["Config"]["PrivateZone"]:
return hosted_zone["Id"].split("/")[2]
if hosted_zones["IsTruncated"]:
hosted_zones = EFAwsResolver.__CLIENTS["route53"].list_hosted_zones_by_name(
DNSName=hosted_zones["NextDNSName"], HostedZoneId=hosted_zones["NextHostedZoneId"], MaxItems=list_limit)
else:
return default
def ec2_route_table_main_route_table_id(self, lookup, default=None):
"""
Args:
lookup: the friendly name of the VPC whose main route table we are looking up
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ID of the main route table of the named VPC, or default if no match/multiple matches found
"""
vpc_id = self.ec2_vpc_vpc_id(lookup)
if vpc_id is None:
return default
route_table = EFAwsResolver.__CLIENTS["ec2"].describe_route_tables(Filters=[
{'Name': 'vpc-id', 'Values': [vpc_id]},
{'Name': 'association.main', 'Values': ['true']}
])
if len(route_table["RouteTables"]) is not 1:
return default
return route_table["RouteTables"][0]["RouteTableId"]
def ec2_route_table_tagged_route_table_id(self, lookup, default=None):
"""
Args:
lookup: the tagged route table name, should be unique
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ID of the route table, or default if no match/multiple matches found
"""
route_table = EFAwsResolver.__CLIENTS["ec2"].describe_route_tables(Filters=[
{'Name': 'tag-key', 'Values': ['Name']},
{'Name': 'tag-value', 'Values': [lookup]}
])
if len(route_table["RouteTables"]) is not 1:
return default
return route_table["RouteTables"][0]["RouteTableId"]
def cloudfront_domain_name(self, lookup, default=None):
"""
Args:
lookup: any CNAME on the Cloudfront distribution
default: the optional value to return if lookup failed; returns None if not set
Returns:
The domain name (FQDN) of the Cloudfront distrinbution, or default/None if no match
"""
# list_distributions returns at most 100 distributions per request
list_limit = "100"
distributions = EFAwsResolver.__CLIENTS["cloudfront"].list_distributions(MaxItems=list_limit)["DistributionList"]
# Return if the account has no Distributions
if "Items" not in distributions:
return default
while True:
for distribution in distributions["Items"]:
if lookup in distribution["Aliases"]["Items"]:
return distribution["DomainName"]
if distributions["IsTruncated"]:
distributions = EFAwsResolver.__CLIENTS["cloudfront"].list_distributions(
MaxItems=list_limit, Marker=distributions["NextMarker"])["DistributionList"]
else:
return default
def cloudfront_origin_access_identity_oai_id(self, lookup, default=None):
"""
Args:
lookup: the FQDN of the Origin Access Identity (from its comments)
default: the optional value to return if lookup failed; returns None if not set
Returns:
the ID of the Origin Access Identity associated with the named FQDN in 'lookup', or default/None if no match
"""
# list_cloud_front_origin_access_identities returns at most 100 oai's per request
list_limit = "100"
oais = EFAwsResolver.__CLIENTS["cloudfront"].list_cloud_front_origin_access_identities(
MaxItems=list_limit)["CloudFrontOriginAccessIdentityList"]
# Return if the account has no OriginAccessIdentities
if "Items" not in oais:
return default
while True:
for oai in oais["Items"]:
if oai["Comment"] == lookup:
return oai["Id"]
if oais["IsTruncated"]:
oais = EFAwsResolver.__CLIENTS["cloudfront"].list_cloud_front_origin_access_identities(
MaxItems=list_limit, Marker=oais["NextMarker"])["CloudFrontOriginAccessIdentityList"]
else:
return default
def cloudfront_origin_access_identity_oai_canonical_user_id(self, lookup, default=None):
"""
Args:
lookup: the FQDN of the Origin Access Identity (from its comments)
default: the optional value to return if lookup failed; returns None if not set
Returns:
the S3 Canonical User ID of the OAI associated with the named FQDN in 'lookup', or default/None if no match
"""
# list_cloud_front_origin_access_identities returns at most 100 oai's per request
list_limit = "100"
oais = EFAwsResolver.__CLIENTS["cloudfront"].list_cloud_front_origin_access_identities(
MaxItems=list_limit)["CloudFrontOriginAccessIdentityList"]
# Return if the account has no OriginAccessIdentities
if "Items" not in oais:
return default
while True:
for oai in oais["Items"]:
if oai["Comment"] == lookup:
return oai["S3CanonicalUserId"]
if oais["IsTruncated"]:
oais = EFAwsResolver.__CLIENTS["cloudfront"].list_cloud_front_origin_access_identities(
MaxItems=list_limit, Marker=oais["NextMarker"])["CloudFrontOriginAccessIdentityList"]
else:
return default
def cognito_identity_identity_pool_arn(self, lookup, default=None):
"""
Args:
lookup: Cognito Federated Identity name, proto0-cms-identity-pool
default: the optional value to return if lookup failed; returns None if not set
Returns:
the constructed ARN for the cognito identity pool, else default/None
"""
identity_pool_id = self.cognito_identity_identity_pool_id(lookup, default)
if identity_pool_id == default:
return default
# The ARN has to be constructed because there is no boto3 call that returns the full ARN for a cognito identity pool
return "arn:aws:cognito-identity:{{{{REGION}}}}:{{{{ACCOUNT}}}}:identitypool/{}".format(identity_pool_id)
def cognito_identity_identity_pool_id(self, lookup, default=None):
"""
Args:
lookup: Cognito Federated Identity name, proto0-cms-identity-pool
default: the optional value to return if lookup failed; returns None if not set
Returns:
the Cognito Identity Pool ID corresponding to the given lookup, else default/None
"""
# List size cannot be greater than 60
list_limit = 60
client = EFAwsResolver.__CLIENTS["cognito-identity"]
response = client.list_identity_pools(MaxResults=list_limit)
while "IdentityPools" in response:
# Loop through all the identity pools
for pool in response["IdentityPools"]:
if pool["IdentityPoolName"] == lookup:
return pool["IdentityPoolId"]
# No match found on this page, but there are more pages
if "NextToken" in response:
response = client.list_identity_pools(MaxResults=list_limit, NextToken=response["NextToken"])
else:
break
return default
def cognito_idp_user_pool_arn(self, lookup, default=None):
"""
Args:
lookup: Cognito User Pool name, proto0-cms-user-pool
default: the optional value to return if lookup failed; returns None if not set
Returns:
the User Pool ARN corresponding to the given lookup, else default/None
"""
client = EFAwsResolver.__CLIENTS["cognito-idp"]
user_pool_id = self.cognito_idp_user_pool_id(lookup, default)
if user_pool_id == default:
return default
response = client.describe_user_pool(UserPoolId=user_pool_id)
if "UserPool" not in response:
return default
return response["UserPool"]["Arn"]
def cognito_idp_user_pool_id(self, lookup, default=None):
"""
Args:
lookup: Cognito User Pool name, proto0-cms-user-pool
default: the optional value to return if lookup failed; returns None if not set
Returns:
the User Pool ID corresponding to the given lookup, else default/None
"""
# List size cannot be greater than 60
list_limit = 60
client = EFAwsResolver.__CLIENTS["cognito-idp"]
response = client.list_user_pools(MaxResults=list_limit)
while "UserPools" in response:
# Loop through all user pools
for pool in response["UserPools"]:
if pool["Name"] == lookup:
return pool["Id"]
# No match found on this page, but there are more pages
if "NextToken" in response:
response = client.list_identity_pools(MaxResults=list_limit, NextToken=response["NextToken"])
else:
break
return default
def kms_decrypt_value(self, lookup):
"""
Args:
lookup: the encrypted value to be decrypted by KMS; base64 encoded
Returns:
The decrypted lookup value
"""
decrypted_lookup = ef_utils.kms_decrypt(EFAwsResolver.__CLIENTS["kms"], lookup)
return decrypted_lookup.plaintext.decode('string_escape')
def kms_key_arn(self, lookup):
"""
Args:
lookup: The key alias, EX: alias/proto0-evs-drm
Returns:
The full key arn
"""
key_arn = ef_utils.kms_key_arn(EFAwsResolver.__CLIENTS["kms"], lookup)
return key_arn
def ram_resource_share_arn(self, lookup, default=None):
"""
Args:
lookup: the name of the resource share to look up
default: the optional value to return if lookup failed; returns None if not set
Returns:
The arn of the first resource share found with a label matching 'lookup' or default/None if no match found
"""
resource_shares = EFAwsResolver.__CLIENTS["ram"].get_resource_shares(
resourceOwner="OTHER-ACCOUNTS",
name=lookup)
if len(resource_shares.get("resourceShares")) > 0:
return resource_shares["resourceShares"][0]["resourceShareArn"]
else:
return default
def ram_resource_arn(self, lookup, default=None):
"""
Args:
lookup: the resource share arn to look up