-
Notifications
You must be signed in to change notification settings - Fork 24
/
websocket_demo.py
39 lines (31 loc) · 921 Bytes
/
websocket_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from ws_connection import ClientClosedError
from ws_server import WebSocketServer, WebSocketClient
class TestClient(WebSocketClient):
def __init__(self, conn):
super().__init__(conn)
def process(self):
try:
msg = self.connection.read()
if not msg:
return
msg = msg.decode("utf-8")
items = msg.split(" ")
cmd = items[0]
if cmd == "Hello":
self.connection.write(cmd + " World")
print("Hello World")
except ClientClosedError:
self.connection.close()
class TestServer(WebSocketServer):
def __init__(self):
super().__init__("test.html", 2)
def _make_client(self, conn):
return TestClient(conn)
server = TestServer()
server.start()
try:
while True:
server.process_all()
except KeyboardInterrupt:
pass
server.stop()