-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
123 lines (91 loc) · 3.25 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import argparse
import asyncio
import json
import sys
import os
import logging
from functools import partial
from aiohttp import web
from os import path
from requirements import Requirements
SOURCES_DIR = "/var/task"
SOURCES_REQUIREMENTS_NAME = path.join(SOURCES_DIR, "requirements.txt")
LAMBDA_USER_ID = 496
logger = logging.getLogger('lambda')
def demote(user_uid, user_gid):
os.setgid(user_gid)
os.setuid(user_uid)
def jsonify(data, status_code=200):
return web.Response(
text=json.dumps(data),
headers={'Content-Type': 'application/json'},
status=status_code)
async def parse_request(request):
data = await request.json()
arn_string = data.get('arn', '')
version = data.get('version', '')
event = data.get('event', '{}')
module = data.get('module', '')
handler = data.get('file', 'handler.handler')
if isinstance(event, str):
event = json.loads(event)
return handler, event
async def async_execute(handler, event, requirements):
process = await asyncio.create_subprocess_exec(
"python", "bootstrap.py", handler, json.dumps(event),
cwd='/var/runtime/awslambda/',
env={**os.environ, 'PYTHONPATH': requirements.directory},
preexec_fn=partial(demote, LAMBDA_USER_ID, LAMBDA_USER_ID),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if stdout:
stdout = json.loads(stdout.decode('utf-8'))
if stderr:
stderr = stderr.decode('utf-8')
return {'stdout': stdout, 'stderr': stderr}
async def run_lambda(requirements, request):
handler, event = await parse_request(request)
result = await async_execute(handler, event, requirements)
return jsonify(data=result)
async def index(request):
return web.FileResponse('./index.html')
def init_logging(args):
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(message)s')
logging.getLogger('aiohttp').setLevel(logging.WARN)
def create_app(requirements):
app = web.Application()
app.router.add_get('/', index)
app.router.add_post('/', partial(run_lambda, requirements))
return app
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'-t', '--timeout', type=float,
default=6, help="Default timeout for function")
parser.add_argument(
'-d', '--directory', default=SOURCES_DIR,
help="Directory inside container with a source code")
parser.add_argument(
'-r', '--requirements', type=str, default=SOURCES_REQUIREMENTS_NAME)
parser.add_argument(
'--force', action='store_true', default=False)
return parser.parse_args()
def install_requirements(requirements_file_path):
req = Requirements(requirements_file_path)
req.ensure_installed()
return req
def main():
args = get_args()
init_logging(args)
requirements = install_requirements(path.join(args.directory, args.requirements))
app = create_app(requirements)
host = '0.0.0.0'
port = 8080
web.run_app(app, host=host, port=port)
if __name__ == '__main__':
exit(main())