/
output.py
237 lines (183 loc) · 6.78 KB
/
output.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""Output processing and formatting.
"""
import re
import json
import pygments
from pygments import token, lexer
from pygments.styles import get_style_by_name, STYLE_MAP
from pygments.lexers import get_lexer_for_mimetype
from pygments.formatters.terminal import TerminalFormatter
from pygments.formatters.terminal256 import Terminal256Formatter
from pygments.util import ClassNotFound
from requests.compat import is_windows
from . import solarized
from .models import Environment
DEFAULT_STYLE = 'solarized'
AVAILABLE_STYLES = [DEFAULT_STYLE] + list(STYLE_MAP.keys())
BINARY_SUPPRESSED_NOTICE = (
'+-----------------------------------------+\n'
'| NOTE: binary data not shown in terminal |\n'
'+-----------------------------------------+'
)
def format(msg, prettifier=None, with_headers=True, with_body=True,
env=Environment()):
"""Return a UTF8-encoded representation of a `models.HTTPMessage`.
Sometimes the body contains binary data so we always return `bytes`.
If `prettifier` is set or the output is a terminal then a binary
body is not included in the output and is replaced with notice.
Generally, when the `stdout` is redirected, the output matches the actual
message as much as possible. When `--pretty` set (or implied),
or when the output is a terminal, then we prefer readability over
precision.
"""
chunks = []
if with_headers:
headers = '\n'.join([msg.line, msg.headers])
if prettifier:
headers = prettifier.process_headers(headers)
chunks.append(headers.strip().encode('utf8'))
if with_body and msg.body or env.stdout_isatty:
chunks.append(b'\n\n')
if with_body and msg.body:
body = msg.body
bin_suppressed = False
if prettifier or env.stdout_isatty:
try:
body = msg.body.decode(msg.encoding or 'utf8')
except UnicodeDecodeError:
# Assume binary
bin_suppressed = True
body = BINARY_SUPPRESSED_NOTICE.encode('utf8')
if not with_headers:
body = b'\n' + body
else:
body = body.encode('utf8')
if not bin_suppressed and prettifier and msg.content_type:
body = (prettifier
.process_body(body.decode('utf8'), msg.content_type)
.strip()
.encode('utf8'))
chunks.append(body)
return b''.join(chunks)
class HTTPLexer(lexer.RegexLexer):
"""Simplified HTTP lexer for Pygments.
It only operates on headers and provides a stronger contrast between
their names and values than the original one bundled with Pygments
(`pygments.lexers.text import HttpLexer`), especially when
Solarized color scheme is used.
"""
name = 'HTTP'
aliases = ['http']
filenames = ['*.http']
tokens = {
'root': [
# Request-Line
(r'([A-Z]+)( +)([^ ]+)( +)(HTTP)(/)(\d+\.\d+)',
lexer.bygroups(
token.Name.Function,
token.Text,
token.Name.Namespace,
token.Text,
token.Keyword.Reserved,
token.Operator,
token.Number
)),
# Response Status-Line
(r'(HTTP)(/)(\d+\.\d+)( +)(\d{3})( +)(.+)',
lexer.bygroups(
token.Keyword.Reserved, # 'HTTP'
token.Operator, # '/'
token.Number, # Version
token.Text,
token.Number, # Status code
token.Text,
token.Name.Exception, # Reason
)),
# Header
(r'(.*?)( *)(:)( *)(.+)', lexer.bygroups(
token.Name.Attribute, # Name
token.Text,
token.Operator, # Colon
token.Text,
token.String # Value
))
]}
class BaseProcessor(object):
enabled = True
def __init__(self, env, **kwargs):
self.env = env
self.kwargs = kwargs
def process_headers(self, headers):
return headers
def process_body(self, content, content_type):
return content
class JSONProcessor(BaseProcessor):
def process_body(self, content, content_type):
if content_type == 'application/json':
try:
# Indent and sort the JSON data.
content = json.dumps(
json.loads(content),
sort_keys=True,
ensure_ascii=False,
indent=4,
)
except ValueError:
# Invalid JSON - we don't care.
pass
return content
class PygmentsProcessor(BaseProcessor):
def __init__(self, *args, **kwargs):
super(PygmentsProcessor, self).__init__(*args, **kwargs)
if not self.env.colors:
self.enabled = False
return
try:
style = get_style_by_name(
self.kwargs.get('pygments_style', DEFAULT_STYLE))
except ClassNotFound:
style = solarized.SolarizedStyle
if is_windows or self.env.colors == 256:
fmt_class = Terminal256Formatter
else:
fmt_class = TerminalFormatter
self.formatter = fmt_class(style=style)
def process_headers(self, headers):
return pygments.highlight(
headers, HTTPLexer(), self.formatter)
def process_body(self, content, content_type):
try:
lexer = get_lexer_for_mimetype(content_type)
except ClassNotFound:
pass
else:
content = pygments.highlight(content, lexer, self.formatter)
return content
class OutputProcessor(object):
installed_processors = [
JSONProcessor,
PygmentsProcessor
]
def __init__(self, env, **kwargs):
processors = [
cls(env, **kwargs)
for cls in self.installed_processors
]
self.processors = [p for p in processors if p.enabled]
def process_headers(self, headers):
for processor in self.processors:
headers = processor.process_headers(headers)
return headers
def process_body(self, content, content_type):
content_type = content_type.split(';')[0]
application_match = re.match(
r'application/(.+\+)(json|xml)$',
content_type
)
if application_match:
# Strip vendor and extensions from Content-Type
vendor, extension = application_match.groups()
content_type = content_type.replace(vendor, '')
for processor in self.processors:
content = processor.process_body(content, content_type)
return content