/
jsonrpc.py
312 lines (289 loc) · 14.2 KB
/
jsonrpc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
'''
Created on 2015/8/12
:author: hubo
'''
from vlcp.protocol import Protocol
from vlcp.config import defaultconfig
from vlcp.event import Event, withIndices, ConnectionWriteEvent
import logging
import os
import json
import re
from vlcp.event.event import M_
@withIndices('state', 'connection', 'connmark', 'createby')
class JsonRPCConnectionStateEvent(Event):
"""
Connection state change
"""
# Connection up
CONNECTION_UP = 'up'
# Connection down
CONNECTION_DOWN = 'down'
@withIndices('method', 'connection', 'connmark', 'createby')
class JsonRPCRequestEvent(Event):
"""
Request received from the connection
"""
canignore = False
def canignorenow(self):
return not self.connection.connected or self.connection.connmark != self.connmark
@withIndices('connection', 'connmark', 'id', 'iserror', 'createby')
class JsonRPCResponseEvent(Event):
"""
Response received from the connection
"""
pass
@withIndices('method', 'connection', 'connmark', 'createby')
class JsonRPCNotificationEvent(Event):
"""
Notification received from the connection
"""
pass
class JsonFormatException(Exception):
pass
class JsonRPCProtocolException(Exception):
pass
class JsonRPCErrorResultException(Exception):
def __init__(self, error, result = None):
Exception.__init__(self, str(error))
self.error = error
self.result = result
@defaultconfig
class JsonRPC(Protocol):
'''
JSON-RPC 1.0 Protocol
'''
_default_persist = True
# This is the OVSDB default port
_default_defaultport = 6632
_default_createqueue = True
# Print debugging log
_default_debugging = False
# JSON encoding
_default_encoding = 'utf-8'
_default_buffersize = 65536
# Default limit a JSON message to 16MB for security purpose
_default_messagelimit = 16777216
# Default limit JSON scan level to 1024 levels
_default_levellimit = 1024
# Limit the allowed request methods
_default_allowedrequests = None
_logger = logging.getLogger(__name__ + '.JsonRPC')
def __init__(self):
'''
Constructor
'''
Protocol.__init__(self)
async def init(self, connection):
await Protocol.init(self, connection)
connection.createdqueues.append(connection.scheduler.queue.addSubQueue(\
self.messagepriority + 2, JsonRPCRequestEvent.createMatcher(connection = connection), ('request', connection), self.messagequeuesize))
connection.createdqueues.append(connection.scheduler.queue.addSubQueue(\
self.messagepriority, JsonRPCConnectionStateEvent.createMatcher(connection = connection), ('connstate', connection)))
connection.createdqueues.append(connection.scheduler.queue.addSubQueue(\
self.messagepriority + 1, JsonRPCResponseEvent.createMatcher(connection = connection), ('response', connection), self.messagequeuesize))
connection.createdqueues.append(connection.scheduler.queue.addSubQueue(\
self.messagepriority, JsonRPCNotificationEvent.createMatcher(connection = connection), ('notification', connection), self.messagequeuesize))
await self._extra_queues(connection)
await self.reconnect_init(connection)
async def _extra_queues(self, connection):
pass
async def reconnect_init(self, connection):
connection.xid = ord(os.urandom(1)) + 1
connection.jsonrpc_parserlevel = 0
connection.jsonrpc_parserstate = 'begin'
await connection.wait_for_send(JsonRPCConnectionStateEvent(JsonRPCConnectionStateEvent.CONNECTION_UP, connection, connection.connmark, self))
async def closed(self, connection):
await Protocol.closed(self, connection)
connection.scheduler.ignore(JsonRPCRequestEvent.createMatcher(connection = connection))
self._logger.info('JSON-RPC connection is closed on %r', connection)
await connection.wait_for_send(JsonRPCConnectionStateEvent(JsonRPCConnectionStateEvent.CONNECTION_DOWN, connection, connection.connmark, self))
async def error(self, connection):
await Protocol.error(self, connection)
connection.scheduler.ignore(JsonRPCRequestEvent.createMatcher(connection = connection))
self._logger.warning('JSON-RPC connection is reset on %r', connection)
await connection.wait_for_send(JsonRPCConnectionStateEvent(JsonRPCConnectionStateEvent.CONNECTION_DOWN, connection, connection.connmark, self))
_BEGIN_PATTERN = re.compile(br'\s*')
_OBJECT_PATTERN = re.compile(br'[^"{}]*')
_STRING_PATTERN = re.compile(br'[^"^\\]*')
def formatrequest(self, method, params, connection):
msgid = connection.xid
msg = {'method': method, 'params': params, 'id': msgid}
connection.xid += 1
if connection.xid > 0x7fffffff:
# Skip xid = 0 for special response
connection.xid = 1
c = ConnectionWriteEvent(connection = connection, connmark = connection.connmark, data = json.dumps(msg).encode(self.encoding))
if self.debugging:
self._logger.debug('message formatted: %r', msg)
return (c, msgid)
def formatnotification(self, method, params, connection):
msg = {'method': method, 'params': params, 'id': None}
c = ConnectionWriteEvent(connection = connection, connmark = connection.connmark, data = json.dumps(msg).encode(self.encoding))
if self.debugging:
self._logger.debug('message formatted: %r', msg)
return c
def formatreply(self, result, requestid, connection):
msg = {'result': result, 'error': None, 'id': requestid}
c = ConnectionWriteEvent(connection = connection, connmark = connection.connmark, data = json.dumps(msg).encode(self.encoding))
if self.debugging:
self._logger.debug('message formatted: %r', msg)
return c
def formaterror(self, error, requestid, connection):
msg = {'result': None, 'error': error, 'id': requestid}
c = ConnectionWriteEvent(connection = connection, connmark = connection.connmark, data = json.dumps(msg).encode(self.encoding))
if self.debugging:
self._logger.debug('message formatted: %r', msg)
return c
def replymatcher(self, requestid, connection, iserror = None):
"""
Create a matcher to match a reply
"""
matcherparam = {'connection' : connection, 'connmark': connection.connmark,
'id': requestid}
if iserror is not None:
matcherparam['iserror'] = iserror
return JsonRPCResponseEvent.createMatcher(**matcherparam)
def notificationmatcher(self, method, connection):
"""
Create an event matcher to match specified notifications
"""
return JsonRPCNotificationEvent.createMatcher(method = method, connection = connection, connmark = connection.connmark)
def statematcher(self, connection, state = JsonRPCConnectionStateEvent.CONNECTION_DOWN, currentconn = True):
"""
Create an event matcher to match the connection state
"""
if currentconn:
return JsonRPCConnectionStateEvent.createMatcher(state, connection, connection.connmark)
else:
return JsonRPCConnectionStateEvent.createMatcher(state, connection)
async def querywithreply(self, method, params, connection, container = None, raiseonerror = True):
"""
Send a JSON-RPC request and wait for the reply.
:return: (result, error) tuple
"""
(c, rid) = self.formatrequest(method, params, connection)
await connection.write(c, False)
reply = self.replymatcher(rid, connection)
conndown = self.statematcher(connection)
ev, m = await M_(reply, conndown)
if m is conndown:
raise JsonRPCProtocolException('Connection is down before reply received')
result = ev.result
error = ev.error
if raiseonerror and error:
raise JsonRPCErrorResultException(error, result)
else:
return (result, error)
async def waitfornotify(self, method, connection, container):
"""
Wait for next notification
:return: (method, params) from the notification
"""
notify = self.notificationmatcher(method, connection)
conndown = self.statematcher(connection)
ev, m = await M_(notify, conndown)
if m is conndown:
raise JsonRPCProtocolException('Connection is down before notification received')
return (ev.method, ev.params)
def parse(self, connection, data, laststart):
jsonstart = 0
start = laststart
end = len(data)
events = []
level = connection.jsonrpc_parserlevel
state = connection.jsonrpc_parserstate
_OBJECT_START = b'{'[0]
_STRING_MARK = b'"'[0]
_ESCAPE_MARK = b'\\'[0]
_OBJECT_END = b'}'[0]
while start < end:
# We only match {} to find the end position
if state == 'begin':
m = self._BEGIN_PATTERN.match(data, start)
start = m.end()
if start < end:
if data[start] == _OBJECT_START:
start += 1
level += 1
state = 'object'
else:
raise JsonFormatException('"{" is not found')
elif state == 'object':
m = self._OBJECT_PATTERN.match(data, start)
start = m.end()
if start < end:
if data[start] == _STRING_MARK:
start += 1
state = 'string'
elif data[start] == _OBJECT_START:
start += 1
level += 1
elif data[start] == _OBJECT_END:
start += 1
level -= 1
if level <= 0:
state = 'begin'
jsondata = data[jsonstart:start]
if hasattr(jsondata, 'tobytes'):
jsondata = jsondata.tobytes()
jsondata = jsondata.decode(self.encoding)
if self.debugging:
self._logger.debug('Parsing json text:\n%s', jsondata)
jsondata = json.loads(jsondata)
if 'method' in jsondata:
if jsondata['method'] is None:
raise JsonFormatException('method is None in input json')
if jsondata['id'] is not None:
# Unprocessed requests will block the JSON-RPC connection message queue,
# as a security consideration, the parser can automatically reject unknown
# requests
if self.allowedrequests is not None and str(jsondata['method']) not in self.allowedrequests:
events.append(self.formaterror('method is not supported', jsondata['id'], connection))
else:
events.append(JsonRPCRequestEvent(method = str(jsondata['method']), params = jsondata['params'],
id = jsondata['id'], connection = connection, connmark = connection.connmark, createby = self))
self._logger.debug('Request received(method = %r, id = %r, connection = %r)', jsondata['method'], jsondata['id'], connection)
else:
events.append(JsonRPCNotificationEvent(method = str(jsondata['method']), params = jsondata['params'],
connection = connection, connmark = connection.connmark, createby = self))
self._logger.debug('Notification received(method = %r, connection = %r)', str(jsondata['method']), connection)
elif 'result' in jsondata:
if jsondata['id'] is None:
raise JsonFormatException('id is None for a response')
events.append(JsonRPCResponseEvent(connection = connection, connmark = connection.connmark,
id = jsondata['id'], iserror = jsondata['error'] is not None,
result = jsondata['result'], error = jsondata['error'], createby = self))
self._logger.debug('Response received(id = %r, connection = %r)', jsondata['id'], connection)
jsonstart = start
else:
# Never really reach
raise JsonFormatException('How can this be reached...')
elif state == 'string':
m = self._STRING_PATTERN.match(data, start)
start = m.end()
if start < end:
if data[start] == _STRING_MARK:
start += 1
state = 'object'
elif data[start] == _ESCAPE_MARK:
start += 1
state = 'escape'
else:
# Never really reach
raise JsonFormatException('How can this be reached...')
else:
# Escape
start += 1
state = 'string'
# Security check
if start - jsonstart > self.messagelimit:
raise JsonFormatException('JSON message size exceeds limit')
if level > self.levellimit:
raise JsonFormatException('JSON message level exceeds limit')
connection.jsonrpc_parserlevel = level
connection.jsonrpc_parserstate = state
if laststart == len(data):
# Remote write close
events.append(ConnectionWriteEvent(connection, connection.connmark, data = b'', EOF = True))
return (events, len(data) - jsonstart)