This repository has been archived by the owner on Sep 19, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
documents.py
103 lines (83 loc) · 3.18 KB
/
documents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from mongoengine import *
import json
class UserAgent(EmbeddedDocument):
browser = StringField()
language = StringField()
platform = StringField()
string = StringField()
version = StringField()
class Tracking(Document):
#session_key = models.CharField(max_length=40, null=True, blank=True, db_index=True)
date_created = DateTimeField()
host = StringField()
path = StringField()
query_params = StringField()
ip = StringField()
user = GenericReferenceField()
user_agent = EmbeddedDocumentField(UserAgent)
method = StringField()
request_headers = ListField()
request_body = BinaryField()
status_code = IntField()
response_headers = ListField()
response_body = BinaryField()
# Execution time in ms
execution_time = IntField()
# System hostname
hostname = StringField()
custom_data = DynamicField()
meta = {
'max_documents': 10**6, # 1 million
}
def user_repr(self):
if self._data['user']:
if isinstance(self._data['user'], dict):
return self._data['user']['_ref'].id
else:
return self.user.id
else:
return '-'
def __unicode__(self):
return '{id} {date} {method} {user} {path}{query} {status} ({time} ms)'.format(
id=self.id,
date=self.date_created.strftime('%Y-%m-%d %H:%M:%S.%f'),
method=self.method,
user=self.user_repr(),
path=self.path,
query=self.query_params and '?%s' % self.query_params or '',
status=self.status_code,
time=self.execution_time)
def debug(self):
ret = '%s %s%s%s\n' % (self.method, self.host, self.path, self.query_params and '?%s' % self.query_params or '')
ret += 'REQUEST:\n'
ret += self.format_headers(self.request_headers) + '\n'
ret += self.format_body(self.request_body) + '\n'
ret += '%s RESPONSE:\n' % self.status_code
ret += self.format_headers(self.response_headers) + '\n'
ret += self.format_body(self.response_body)
return ret
def get_header(self, name, default=''):
return { h[0]: h[1] for h in self.request_headers }.get(name, default)
def replay(self):
from flask import current_app
client = current_app.test_client()
# Make sure we don't send invalid cookies.
client.cookie_jar.clear()
full_path = self.path + ('?'+self.query_params if self.query_params else '')
method_func = getattr(client, self.method.lower())
return method_func(
full_path,
headers=self.request_headers,
data=self.request_body,
content_type=dict(self.request_headers)['Content-Type']
)
@staticmethod
def format_body(inpt):
"""Format an HTTP body as JSON if possible, otherwise return string"""
try:
return json.dumps(json.loads(inpt.decode('utf8')), indent=4)
except ValueError:
return repr(inpt)
@staticmethod
def format_headers(headers):
return '\n'.join([' %s: %s' % (h[0], h[1] if len(h[1]) < 100 else '%s...' % h[1][:100]) for h in headers])