/
protocol.py
172 lines (151 loc) · 6.42 KB
/
protocol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
'''
Created on 2015/6/29
:author: hubo
'''
from vlcp.config import Configurable, defaultconfig
from vlcp.event.connection import ConnectionWriteEvent
from vlcp.event.core import syscall_clearqueue, syscall_removequeue, syscall_clearremovequeue
from logging import getLogger
from socket import SOL_SOCKET, SO_ERROR
from ssl import PROTOCOL_SSLv23
import errno
@defaultconfig
class Protocol(Configurable):
'''
Protocol base class
'''
# Message event priority for this protocol
_default_messagepriority = 400
# Data write event priority for this protocol
_default_writepriority = 600
# Create separated queues for data write events from each connection
_default_createqueue = False
# Wait before cleanup the created queues for each connection
_default_cleanuptimeout = 60
# Data write event queue size for each connection
_default_writequeuesize = 10
# Message event queue size for each connection
_default_messagequeuesize = 10
# Enable keep-alive for this protocol: send protocol specified keep-alive packages when
# no data is read from the connection to detect the connection liveness
_default_keepalivetime = None
# Send protocol specified keep-alive packages when no data is written to the connection
_default_writekeepalivetime = None
# Use SO_REUSEPORT socket option for the connections, so that multiple processes can bind to
# the same port; can be used to create load-balanced services
_default_reuseport = False
# This protocol should automatically reconnect when the connection is disconnected unexpectedly
_default_persist = False
# Default read buffer size for this protocol. When the buffer is not large enough to contain a
# single message, the buffer will automatically be enlarged.
_default_buffersize = 4096
# Connect timeout for this protocol
_default_connect_timeout = 30
# Enable TCP_NODELAY option for this protocol
_default_tcp_nodelay = False
# Enabled SSL version, default to PROTOCOL_SSLv23, configure it to a TLS version for more security
_default_sslversion = PROTOCOL_SSLv23
# Server socket should retry listening if failed to bind to the specified address
_default_listen_persist = True
# Retry interval for listen
_default_retrylisten_interval = 3
# Default listen backlog size
_default_backlogsize = 2048
vhost = '<other>'
_logger = getLogger(__name__ + '.Protocol')
def __init__(self):
'''
Constructor
'''
Configurable.__init__(self)
def parse(self, connection, data, laststart):
'''
Parse input data into events
:param connection: connection object
:param data: view for input data
:param laststart: last parsed position
:returns: (events, keep) where events are parsed events to send, keep is the unused data length to be keeped for next parse.
'''
raise NotImplementedError
def serialize(self, connection, event):
'''
Serialize a write event to bytes, and return if it is EOF
:param connection: connection object
:param event: write event
:returns: (bytes, EOF)
'''
return (event.data, getattr(event, 'EOF', False))
async def init(self, connection):
'''
routine for connection initialization
'''
try:
connection.createdqueues = []
if self.createqueue:
connection.queue = connection.scheduler.queue.addSubQueue(self.writepriority, ConnectionWriteEvent.createMatcher(connection = connection), ('write', connection), self.writequeuesize)
connection.createdqueues.append(connection.queue)
except IndexError:
pass
async def _clearwritequeue(self, connection):
if hasattr(connection, 'queue'):
await connection.syscall(syscall_clearqueue(connection.queue))
async def error(self, connection):
'''
routine for connection error
'''
err = connection.socket.getsockopt(SOL_SOCKET, SO_ERROR)
self._logger.warning('Connection error status: %d(%s)', err, errno.errorcode.get(err, 'Not found'))
connection.scheduler.ignore(ConnectionWriteEvent.createMatcher(connection = connection))
await self._clearwritequeue(connection)
async def closed(self, connection):
'''
routine for connection closed
'''
connection.scheduler.ignore(ConnectionWriteEvent.createMatcher(connection = connection))
await self._clearwritequeue(connection)
async def notconnected(self, connection):
'''
routine for connect failed and not retrying
'''
self._logger.warning('Connect failed and not retrying for url: %s', connection.rawurl)
async def reconnect_init(self, connection):
'''
routine for reconnect
'''
pass
def accept(self, server, newaddr, newsocket):
'''
server accept
:returns: new protocol object
'''
self._logger.debug('Connection accepted from ' + repr(newaddr))
return self
async def final(self, connection):
'''
routine for a connection finally ends: all connections are closed and not retrying
'''
if hasattr(connection, 'createdqueues') and connection.createdqueues:
timeout, _ = await connection.execute_with_timeout(
self.cleanuptimeout,
connection.wait_for_all_empty(*connection.createdqueues))
if timeout:
self._logger.warning('Events are still not processed after timeout, Protocol = %r, Connection = %r', self, connection)
for q in connection.createdqueues:
await connection.syscall(syscall_clearremovequeue(connection.scheduler.queue, q))
del connection.createdqueues[:]
async def beforelisten(self, tcpserver, newsocket):
'''
routine before a socket entering listen mode
'''
pass
async def serverfinal(self, tcpserver):
'''
routine for a tcpserver finally shutdown or not connected
'''
pass
async def keepalive(self, connection):
'''
routine executed when there has been a long time since last data arrival.
Check if the connection is down.
'''
pass