-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pwebarc_dumb_dump_server.py
executable file
·195 lines (160 loc) · 7.08 KB
/
pwebarc_dumb_dump_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/usr/bin/env python3
# A very simple archiving server for pWebArc.
#
# Copyright (c) 2023 Jan Malakhovski <oxij@oxij.org>
#
# This file can be distributed under the terms of the GNU GPL, version 3 or later.
import argparse
import gzip
import io
import os
import sys
import threading
import time
import urllib.parse as up
from wsgiref.validate import validator
from wsgiref.simple_server import make_server
try:
import importlib.metadata as meta
version = meta.version(__package__)
except Exception:
version = "dev"
cbor2 = None
mypid = str(os.getpid())
class HTTPDumpServer(threading.Thread):
"""HTTP server that accepts HTTP dumps as POST data, tries to compresses them
with gzip, and saves them in a given directory.
This runs in a separate thread so that KeyboardInterrupt and such
would not interrupt a dump in the middle.
"""
def __init__(self, host, port, root, uncompressed, default_profile, ignore_profiles, *args, **kwargs):
super().__init__(*args, **kwargs)
self.httpd = make_server(host, port, validator(self.handle_request))
self.root = os.path.expanduser(root)
self.uncompressed = uncompressed
self.default_profile = default_profile
self.ignore_profiles = ignore_profiles
self.prevsec = 0
self.num = 0
print(f"Listening for archive requests on http://{host}:{port}/pwebarc/dump")
def run(self):
self.httpd.serve_forever()
def stop(self):
self.httpd.shutdown()
def handle_request(self, environ, start_response):
def end_with(explanation, more):
start_response(explanation, [("Content-Type", "text/plain; charset=utf-8")])
yield more
method = environ["REQUEST_METHOD"]
path = environ["PATH_INFO"]
if method == "POST" and path == "/pwebarc/dump":
# sanity check
ctype = environ["CONTENT_TYPE"]
if ctype != "application/cbor":
yield from end_with("400 Bad Request", b"expecting CBOR data")
return
try:
query = environ["QUERY_STRING"]
except KeyError:
query = ""
params = up.parse_qs(query)
profile = ""
if "profile" in params:
profile = params["profile"][0]
if self.ignore_profiles or profile == "":
profile = self.default_profile
# read request body data
fp = environ["wsgi.input"]
data = b""
todo = int(environ["CONTENT_LENGTH"])
while todo > 0:
res = fp.read(todo)
data += res
todo -= len(res)
if cbor2 is not None:
rparsed = repr(cbor2.loads(data))
if len(rparsed) < 3000:
print("parsed", rparsed)
else:
print("parsed", rparsed[:1500])
print("...")
print(rparsed[-1500:])
del rparsed
if not self.uncompressed:
# gzip it, if it gzips
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, filename="", mtime=0, mode="wb", compresslevel=9) as gz:
gz.write(data)
compressed_data = buf.getvalue()
if len(compressed_data) < len(data):
data = compressed_data
# free the memory immediately
del buf
del compressed_data
# write it out to a file in {self.root}/<year>/<month>/<day>/<epoch>_<number>.wrr
# because time.time() gives a float
epoch = time.time_ns() // 1000000000
# number reqres sequentially while in the same second
if (self.prevsec != epoch):
self.num = 0
else:
self.num += 1
self.prevsec = epoch
dd = list(map(lambda x: format(x, "02"), time.gmtime(epoch)[0:3]))
directory = os.path.join(self.root, profile, *dd)
path = os.path.join(directory, f"{str(epoch)}_{mypid}_{str(self.num)}.wrr")
os.makedirs(directory, exist_ok=True)
tmp_path = path + ".part"
try:
with open(tmp_path, "wb") as f:
f.write(data)
except Exception as exc:
try:
os.unlink(tmp_path)
except Exception:
pass
raise exc
os.rename(tmp_path, path)
print("dumped", path)
yield from end_with("200 OK", b"")
else:
yield from end_with("404 Not Found", b"")
def main():
global cbor2
parser = argparse.ArgumentParser(prog=__package__,
description="Simple archiving server for pWebArc. Dumps each request to `<ROOT>/<profile>/<year>/<month>/<day>/<epoch>_<number>.wrr`.",
add_help = False)
parser.add_argument("-h", "--help", action="store_true", help="show this help message and exit")
parser.add_argument("--version", action="version", version=f"{__package__} {version}")
parser.add_argument("--host", default="127.0.0.1", type=str, help="listen on what host/IP (default: 127.0.0.1)")
parser.add_argument("--port", default=3210, type=int, help="listen on what port (default: 3210)")
parser.add_argument("--root", default="pwebarc-dump", type=str, help="path to dump data into (default: pwebarc-dump)")
parser.add_argument("--uncompressed", action="store_true", help="dump new archivals to disk without compression; the default is to try to compress each new archive first")
parser.add_argument("--default-profile", metavar="NAME", default="default", type=str, help="default profile to use when no `profile` query parameter is supplied by the extension (default: `default`)")
parser.add_argument("--ignore-profiles", action="store_true", help="ignore `profile` query parameter supplied by the extension and use the value of `--default-profile` instead")
parser.add_argument("--no-print-cbors", action="store_true", help="don't print parsed representations of newly archived CBORs to stdout even if `cbor2` module is available")
args = parser.parse_args(sys.argv[1:])
if args.help:
if not sys.stdout.isatty():
parser.formatter_class = lambda *args, **kwargs: argparse.HelpFormatter(*args, width=1024, **kwargs)
print(parser.format_help())
sys.exit(0)
if not args.no_print_cbors:
try:
import cbor2 as cbor2_
except ImportError:
sys.stderr.write("warning: `cbor2` module is not available, forcing `--no-cbor` option\n")
sys.stderr.flush()
else:
cbor2 = cbor2_
del cbor2_
t = HTTPDumpServer(args.host, args.port, args.root, args.uncompressed, args.default_profile, args.ignore_profiles)
t.start()
try:
t.join()
except KeyboardInterrupt:
print("Interrupted.")
t.stop()
t.join()
if __name__ == "__main__":
main()