/
lambda_function.py
98 lines (84 loc) · 2.81 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"Lambda function list pods in EKS cluster"
import base64
import os
import logging
import re
import boto3
from botocore.signers import RequestSigner
from kubernetes import client, config
logger = logging.getLogger()
logger.setLevel(logging.INFO)
STS_TOKEN_EXPIRES_IN = 60
session = boto3.session.Session()
sts = session.client('sts')
service_id = sts.meta.service_model.service_id
cluster_name = os.environ["CLUSTER_NAME"]
eks = boto3.client('eks')
cluster_cache = {}
def get_cluster_info():
"Retrieve cluster endpoint and certificate"
cluster_info = eks.describe_cluster(name=cluster_name)
endpoint = cluster_info['cluster']['endpoint']
cert_authority = cluster_info['cluster']['certificateAuthority']['data']
cluster_info = {
"endpoint" : endpoint,
"ca" : cert_authority
}
return cluster_info
def get_bearer_token():
"Create authentication token"
signer = RequestSigner(
service_id,
session.region_name,
'sts',
'v4',
session.get_credentials(),
session.events
)
params = {
'method': 'GET',
'url': 'https://sts.{}.amazonaws.com/'
'?Action=GetCallerIdentity&Version=2011-06-15'.format(session.region_name),
'body': {},
'headers': {
'x-k8s-aws-id': cluster_name
},
'context': {}
}
signed_url = signer.generate_presigned_url(
params,
region_name=session.region_name,
expires_in=STS_TOKEN_EXPIRES_IN,
operation_name=''
)
base64_url = base64.urlsafe_b64encode(signed_url.encode('utf-8')).decode('utf-8')
# remove any base64 encoding padding:
return 'k8s-aws-v1.' + re.sub(r'=*', '', base64_url)
def lambda_handler(_event, _context):
"Lambda handler"
if cluster_name in cluster_cache:
cluster = cluster_cache[cluster_name]
else:
# not present in cache retrieve cluster info from EKS service
cluster = get_cluster_info()
# store in cache for execution environment resuse
cluster_cache[cluster_name] = cluster
kubeconfig = {
'apiVersion': 'v1',
'clusters': [{
'name': 'cluster1',
'cluster': {
'certificate-authority-data': cluster["ca"],
'server': cluster["endpoint"]}
}],
'contexts': [{'name': 'context1', 'context': {'cluster': 'cluster1', "user": "user1"}}],
'current-context': 'context1',
'kind': 'Config',
'preferences': {},
'users': [{'name': 'user1', "user" : {'token': get_bearer_token()}}]
}
config.load_kube_config_from_dict(config_dict=kubeconfig)
v1_api = client.CoreV1Api() # api_client
ret = v1_api.list_namespaced_pod("default")
return f"There are {len(ret.items)} pods in the default namespace."
print(lambda_handler(None, None))