-
Notifications
You must be signed in to change notification settings - Fork 287
/
tornadoclient.py
155 lines (122 loc) · 4.94 KB
/
tornadoclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# -*- coding: utf-8 -*-
import ssl
from tornado import iostream, escape
from ws4py.client import WebSocketBaseClient
from ws4py.exc import HandshakeError
__all__ = ['TornadoWebSocketClient']
class TornadoWebSocketClient(WebSocketBaseClient):
def __init__(self, url, protocols=None, extensions=None,
io_loop=None, ssl_options=None, headers=None, exclude_headers=None):
"""
.. code-block:: python
from tornado import ioloop
class MyClient(TornadoWebSocketClient):
def opened(self):
for i in range(0, 200, 25):
self.send("*" * i)
def received_message(self, m):
print((m, len(str(m))))
def closed(self, code, reason=None):
ioloop.IOLoop.instance().stop()
ws = MyClient('ws://localhost:9000/echo', protocols=['http-only', 'chat'])
ws.connect()
ioloop.IOLoop.instance().start()
"""
WebSocketBaseClient.__init__(self, url, protocols, extensions,
ssl_options=ssl_options, headers=headers, exclude_headers=exclude_headers)
if self.scheme == "wss":
self.sock = ssl.wrap_socket(self.sock, do_handshake_on_connect=False, **self.ssl_options)
self._is_secure = True
self.io = iostream.SSLIOStream(self.sock, io_loop, ssl_options=self.ssl_options)
else:
self.io = iostream.IOStream(self.sock, io_loop)
self.io_loop = io_loop
def connect(self):
"""
Connects the websocket and initiate the upgrade handshake.
"""
self.io.set_close_callback(self.__connection_refused)
self.io.connect((self.host, int(self.port)), self.__send_handshake)
def _write(self, b):
"""
Trying to prevent a write operation
on an already closed websocket stream.
This cannot be bullet proof but hopefully
will catch almost all use cases.
"""
if self.terminated:
raise RuntimeError("Cannot send on a terminated websocket")
self.io.write(b)
def __connection_refused(self, *args, **kwargs):
self.server_terminated = True
self.closed(1005, 'Connection refused')
def __send_handshake(self):
self.io.set_close_callback(self.__connection_closed)
self.io.write(escape.utf8(self.handshake_request),
self.__handshake_sent)
def __connection_closed(self, *args, **kwargs):
self.server_terminated = True
self.closed(1006, 'Connection closed during handshake')
def __handshake_sent(self):
self.io.read_until(b"\r\n\r\n", self.__handshake_completed)
def __handshake_completed(self, data):
self.io.set_close_callback(None)
try:
response_line, _, headers = data.partition(b'\r\n')
self.process_response_line(response_line)
protocols, extensions = self.process_handshake_header(headers)
except HandshakeError:
self.close_connection()
raise
self.opened()
self.io.set_close_callback(self.__stream_closed)
self.io.read_bytes(self.reading_buffer_size, self.__fetch_more)
def __fetch_more(self, bytes):
try:
should_continue = self.process(bytes)
except:
should_continue = False
if should_continue:
self.io.read_bytes(self.reading_buffer_size, self.__fetch_more)
else:
self.__gracefully_terminate()
def __gracefully_terminate(self):
self.client_terminated = self.server_terminated = True
try:
if not self.stream.closing:
self.closed(1006)
finally:
self.close_connection()
def __stream_closed(self, *args, **kwargs):
self.io.set_close_callback(None)
code = 1006
reason = None
if self.stream.closing:
code, reason = self.stream.closing.code, self.stream.closing.reason
self.closed(code, reason)
self.stream._cleanup()
def close_connection(self):
"""
Close the underlying connection
"""
self.io.close()
if __name__ == '__main__':
from tornado import ioloop
class MyClient(TornadoWebSocketClient):
def opened(self):
def data_provider():
for i in range(0, 200, 25):
yield "#" * i
self.send(data_provider())
for i in range(0, 200, 25):
self.send("*" * i)
def received_message(self, m):
print("#%d" % len(m))
if len(m) == 175:
self.close()
def closed(self, code, reason=None):
ioloop.IOLoop.instance().stop()
print(("Closed down", code, reason))
ws = MyClient('ws://localhost:9000/ws', protocols=['http-only', 'chat'])
ws.connect()
ioloop.IOLoop.instance().start()