Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 126 lines (104 sloc) 4.013 kb
04aa9521 »
2011-01-14 initial checkin
1 # repsocket.py
2 #
3 # A durable reply socket.
4
5 import socket
6 import msgsocket
7 import threading
8 import queue
9 import msgauth
10 import pickle
11
12 # Internal object used to store reply data
13 class ReplyData:
14 def __init__(self):
15 self.msg = None
16 self.evt = threading.Event()
17
18 class ReplySocket:
19 def __init__(self):
20 self._messages = queue.Queue() # Received messages
21 self._pending_reply = None # Reply is pending
22
23 # Bind the socket to a given address and start an acceptor thread
24 def bind(self,address,authkey=b"default"):
25 thr = threading.Thread(target=self._acceptor_thread,args=(address,authkey))
26 thr.daemon = True
27 thr.start()
28
29 # Internal thread that accepts client connections and launches handler threads
30 def _acceptor_thread(self,address,authkey):
31 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
32 self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
33 self._sock.bind(address)
34 self._sock.listen(5)
35 print("Listening on ", address)
36 while True:
37 client_sock, addr = self._sock.accept()
38 thr = threading.Thread(target=self._client_handler_thread,
39 args=(client_sock,addr,authkey))
40 thr.daemon = True
41 thr.start()
42
43 # Client handler thread. Receives messages and sends responses
44 def _client_handler_thread(self,client_sock,addr,authkey):
45 print("Got connection from", addr)
46 # Authentication. If bad, immediately drop the connection and return
47 if not msgauth.send_challenge(client_sock,authkey):
48 print("Bad authentication")
49 client_sock.close()
50 return
51
52 msgsock = msgsocket.MessageSocket(client_sock)
53 reply = ReplyData()
54 while True:
55 try:
56 # Receive an incoming message and queue it
57 msg = msgsock.recv()
58 reply.msg = None
59 reply.evt.clear()
60 self._messages.put((msg,reply))
61
62 # Wait for the reply to be set and send it back
63 reply.evt.wait()
64 msgsock.send(reply.msg)
65 except Exception as e:
66 print("Closed connection from %s: %s" % (addr, e))
67 break
68 client_sock.close()
69
70 # Receive a message from any of the connected clients (via queue)
71 def recv_bytes(self):
72 # If a reply was already pending, it's an error to call recv() again
73 if self._pending_reply:
74 raise RuntimeError("Must call send() after recv()")
75
76 # Get the message and set the pending reply value
77 msg,self._pending_reply = self._messages.get()
78 return msg
79
80 # Send a message back to the connected clients
81 def send_bytes(self,msg):
82 # If no reply is pending, it's an error to call send()
83 if not self._pending_reply:
84 raise RuntimeError("Must call recv() first")
85
86 # Set the reply message and signal the handler thread that it's ready
87 self._pending_reply.msg = msg
88 self._pending_reply.evt.set()
89 self._pending_reply = None
90
91 # Pickle support
92 def send(self,obj):
93 self.send_bytes(pickle.dumps(obj))
94
95 def recv(self):
96 return pickle.loads(self.recv_bytes())
97
98 # Test code
99 if __name__ == '__main__':
100 import sys
101
102 if len(sys.argv) != 2:
103 print("Usage: %s port" % sys.argv[0])
104 raise SystemExit(1)
105
106 port = int(sys.argv[1])
107 def test_server(port):
108 print("Echo server running on port",port)
109 s = ReplySocket()
110 s.bind(("",port),authkey=b"peekaboo")
111 while True:
112 msg = s.recv()
113 print("Got message: ", msg)
114 s.send(('response',msg))
115
116 thr = threading.Thread(target=test_server,args=(port,))
117 thr.daemon = True
118 thr.start()
119
120 # This is so Ctrl-C works
121 import time
122 while True:
123 time.sleep(1)
124
125
Something went wrong with that request. Please try again.