-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
srv.py
210 lines (168 loc) · 6.38 KB
/
srv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
#!/usr/bin/env python
# vim: set encoding=utf-8
"""
Main server program.
"""
from __future__ import print_function
from gevent.monkey import patch_all
from gevent.pywsgi import WSGIServer
patch_all()
# pylint: disable=wrong-import-position,wrong-import-order
import sys
import logging
import os
import requests
import jinja2
from flask import Flask, request, send_from_directory, redirect, Response
MYDIR = os.path.abspath(os.path.join(__file__, '..', '..'))
sys.path.append("%s/lib/" % MYDIR)
from globals import FILE_QUERIES_LOG, LOG_FILE, TEMPLATES, STATIC, MALFORMED_RESPONSE_HTML_PAGE, SERVER_ADDRESS, SERVER_PORT
from limits import Limits
from cheat_wrapper import cheat_wrapper
from post import process_post_request
from options import parse_args
from stateful_queries import save_query, last_query
# pylint: disable=wrong-import-position,wrong-import-order
if not os.path.exists(os.path.dirname(LOG_FILE)):
os.makedirs(os.path.dirname(LOG_FILE))
logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s %(message)s')
app = Flask(__name__) # pylint: disable=invalid-name
app.jinja_loader = jinja2.ChoiceLoader([
app.jinja_loader,
jinja2.FileSystemLoader(TEMPLATES),
])
LIMITS = Limits()
def is_html_needed(user_agent):
"""
Basing on `user_agent`, return whether it needs HTML or ANSI
"""
plaintext_clients = ['curl', 'wget', 'fetch', 'httpie', 'lwp-request', 'openbsd ftp', 'python-requests']
return all([x not in user_agent for x in plaintext_clients])
def is_result_a_script(query):
return query in [':cht.sh']
@app.route('/files/<path:path>')
def send_static(path):
"""
Return static file `path`.
Can be served by the HTTP frontend.
"""
return send_from_directory(STATIC, path)
@app.route('/favicon.ico')
def send_favicon():
"""
Return static file `favicon.ico`.
Can be served by the HTTP frontend.
"""
return send_from_directory(STATIC, 'favicon.ico')
@app.route('/malformed-response.html')
def send_malformed():
"""
Return static file `malformed-response.html`.
Can be served by the HTTP frontend.
"""
return send_from_directory(STATIC, 'malformed-response.html')
def log_query(ip_addr, found, topic, user_agent):
"""
Log processed query and some internal data
"""
log_entry = "%s %s %s %s" % (ip_addr, found, topic, user_agent)
with open(FILE_QUERIES_LOG, 'a') as my_file:
my_file.write(log_entry.encode('utf-8')+"\n")
def get_request_ip(req):
"""
Extract IP address from `request`
"""
if req.headers.getlist("X-Forwarded-For"):
ip_addr = req.headers.getlist("X-Forwarded-For")[0]
if ip_addr.startswith('::ffff:'):
ip_addr = ip_addr[7:]
else:
ip_addr = req.remote_addr
if req.headers.getlist("X-Forwarded-For"):
ip_addr = req.headers.getlist("X-Forwarded-For")[0]
if ip_addr.startswith('::ffff:'):
ip_addr = ip_addr[7:]
else:
ip_addr = req.remote_addr
return ip_addr
def _proxy(*args, **kwargs):
# print "method=", request.method,
# print "url=", request.url.replace('/:shell-x/', ':3000/')
# print "headers=", {key: value for (key, value) in request.headers if key != 'Host'}
# print "data=", request.get_data()
# print "cookies=", request.cookies
# print "allow_redirects=", False
url_before, url_after = request.url.split('/:shell-x/', 1)
url = url_before + ':3000/'
if 'q' in request.args:
url_after = '?' + "&".join("arg=%s" % x for x in request.args['q'].split())
url += url_after
print(url)
print(request.get_data())
resp = requests.request(
method=request.method,
url=url,
headers={key: value for (key, value) in request.headers if key != 'Host'},
data=request.get_data(),
cookies=request.cookies,
allow_redirects=False)
excluded_headers = ['content-encoding', 'content-length', 'transfer-encoding', 'connection']
headers = [(name, value) for (name, value) in resp.raw.headers.items()
if name.lower() not in excluded_headers]
response = Response(resp.content, resp.status_code, headers)
return response
@app.route("/", methods=['GET', 'POST'])
@app.route("/<path:topic>", methods=["GET", "POST"])
def answer(topic=None):
"""
Main rendering function, it processes incoming weather queries.
Depending on user agent it returns output in HTML or ANSI format.
Incoming data:
request.args
request.headers
request.remote_addr
request.referrer
request.query_string
"""
user_agent = request.headers.get('User-Agent', '').lower()
html_needed = is_html_needed(user_agent)
options = parse_args(request.args)
if topic in ['apple-touch-icon-precomposed.png', 'apple-touch-icon.png', 'apple-touch-icon-120x120-precomposed.png'] \
or (topic is not None and any(topic.endswith('/'+x) for x in ['favicon.ico'])):
return ''
request_id = request.cookies.get('id')
if topic is not None and topic.lstrip('/') == ':last':
if request_id:
topic = last_query(request_id)
else:
return "ERROR: you have to set id for your requests to use /:last\n"
else:
if request_id:
save_query(request_id, topic)
if request.method == 'POST':
process_post_request(request, html_needed)
if html_needed:
return redirect("/")
return "OK\n"
if 'topic' in request.args:
return redirect("/%s" % request.args.get('topic'))
if topic is None:
topic = ":firstpage"
if topic.startswith(':shell-x/'):
return _proxy()
#return requests.get('http://127.0.0.1:3000'+topic[8:]).text
ip_address = get_request_ip(request)
if '+' in topic:
not_allowed = LIMITS.check_ip(ip_address)
if not_allowed:
return "429 %s\n" % not_allowed, 429
html_is_needed = is_html_needed(user_agent) and not is_result_a_script(topic)
result, found = cheat_wrapper(topic, request_options=options, html=html_is_needed)
if 'Please come back in several hours' in result and html_is_needed:
return MALFORMED_RESPONSE_HTML_PAGE
log_query(ip_address, found, topic, user_agent)
if html_is_needed:
return result
return Response(result, mimetype='text/plain')
SRV = WSGIServer((SERVER_ADDRESS, SERVER_PORT), app) # log=None)
SRV.serve_forever()