/
advanceddispatcher.py
139 lines (121 loc) · 4.58 KB
/
advanceddispatcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import socket
import threading
import time
import asyncore_pollchoose as asyncore
from debug import logger
from helper_threading import BusyError, nonBlocking
import state
class ProcessingError(Exception):
pass
class UnknownStateError(ProcessingError):
pass
class AdvancedDispatcher(asyncore.dispatcher):
_buf_len = 131072 # 128kB
def __init__(self, sock=None):
if not hasattr(self, '_map'):
asyncore.dispatcher.__init__(self, sock)
self.read_buf = bytearray()
self.write_buf = bytearray()
self.state = "init"
self.lastTx = time.time()
self.sentBytes = 0
self.receivedBytes = 0
self.expectBytes = 0
self.readLock = threading.RLock()
self.writeLock = threading.RLock()
self.processingLock = threading.RLock()
def append_write_buf(self, data):
if data:
if isinstance(data, list):
with self.writeLock:
for chunk in data:
self.write_buf.extend(chunk)
else:
with self.writeLock:
self.write_buf.extend(data)
def slice_write_buf(self, length=0):
if length > 0:
with self.writeLock:
if length >= len(self.write_buf):
del self.write_buf[:]
else:
del self.write_buf[0:length]
def slice_read_buf(self, length=0):
if length > 0:
with self.readLock:
if length >= len(self.read_buf):
del self.read_buf[:]
else:
del self.read_buf[0:length]
def process(self):
while self.connected and not state.shutdown:
try:
with nonBlocking(self.processingLock):
if not self.connected or state.shutdown:
break
if len(self.read_buf) < self.expectBytes:
return False
try:
cmd = getattr(self, "state_" + str(self.state))
except AttributeError:
logger.error("Unknown state %s", self.state, exc_info=True)
raise UnknownState(self.state)
if not cmd():
break
except BusyError:
return False
return False
def set_state(self, state, length=0, expectBytes=0):
self.expectBytes = expectBytes
self.slice_read_buf(length)
self.state = state
def writable(self):
self.uploadChunk = AdvancedDispatcher._buf_len
if asyncore.maxUploadRate > 0:
self.uploadChunk = int(asyncore.uploadBucket)
self.uploadChunk = min(self.uploadChunk, len(self.write_buf))
return asyncore.dispatcher.writable(self) and \
(self.connecting or (self.connected and self.uploadChunk > 0))
def readable(self):
self.downloadChunk = AdvancedDispatcher._buf_len
if asyncore.maxDownloadRate > 0:
self.downloadChunk = int(asyncore.downloadBucket)
try:
if self.expectBytes > 0 and not self.fullyEstablished:
self.downloadChunk = min(self.downloadChunk, self.expectBytes - len(self.read_buf))
if self.downloadChunk < 0:
self.downloadChunk = 0
except AttributeError:
pass
return asyncore.dispatcher.readable(self) and \
(self.connecting or self.accepting or (self.connected and self.downloadChunk > 0))
def handle_read(self):
self.lastTx = time.time()
newData = self.recv(self.downloadChunk)
self.receivedBytes += len(newData)
asyncore.update_received(len(newData))
with self.readLock:
self.read_buf.extend(newData)
def handle_write(self):
self.lastTx = time.time()
written = self.send(self.write_buf[0:self.uploadChunk])
asyncore.update_sent(written)
self.sentBytes += written
self.slice_write_buf(written)
def handle_connect_event(self):
try:
asyncore.dispatcher.handle_connect_event(self)
except socket.error as e:
if e.args[0] not in asyncore._DISCONNECTED:
raise
def handle_connect(self):
self.lastTx = time.time()
def state_close(self):
return False
def handle_close(self):
with self.readLock:
self.read_buf = bytearray()
with self.writeLock:
self.write_buf = bytearray()
self.set_state("close")
self.close()