-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
fileio1.py
102 lines (79 loc) · 2.57 KB
/
fileio1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File Transfer model #1
#
# In which the server sends the entire file to the client in
# large chunks with no attempt at flow control.
from __future__ import print_function
from threading import Thread
import zmq
from zhelpers import socket_set_hwm, zpipe
CHUNK_SIZE = 250000
def client_thread(ctx, pipe):
dealer = ctx.socket(zmq.DEALER)
dealer.connect("tcp://127.0.0.1:6000")
dealer.send(b"fetch")
total = 0 # Total bytes received
chunks = 0 # Total chunks received
while True:
try:
chunk = dealer.recv()
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
return # shutting down, quit
else:
raise
chunks += 1
size = len(chunk)
total += size
if size == 0:
break # whole file received
print ("%i chunks received, %i bytes" % (chunks, total))
pipe.send(b"OK")
# File server thread
# The server thread reads the file from disk in chunks, and sends
# each chunk to the client as a separate message. We only have one
# test file, so open that once and then serve it out as needed:
def server_thread(ctx):
file = open("testdata", "rb")
router = ctx.socket(zmq.ROUTER)
# Default HWM is 1000, which will drop messages here
# since we send more than 1,000 chunks of test data,
# so set an infinite HWM as a simple, stupid solution:
socket_set_hwm(router, 0)
router.bind("tcp://*:6000")
while True:
# First frame in each message is the sender identity
# Second frame is "fetch" command
try:
identity, command = router.recv_multipart()
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
return # shutting down, quit
else:
raise
assert command == b"fetch"
while True:
data = file.read(CHUNK_SIZE)
router.send_multipart([identity, data])
if not data:
break
# File main thread
# The main task starts the client and server threads; it's easier
# to test this as a single process with threads, than as multiple
# processes:
def main():
# Start child threads
ctx = zmq.Context()
a,b = zpipe(ctx)
client = Thread(target=client_thread, args=(ctx, b))
server = Thread(target=server_thread, args=(ctx,))
client.start()
server.start()
# loop until client tells us it's done
try:
print (a.recv())
except KeyboardInterrupt:
pass
del a,b
ctx.term()
if __name__ == '__main__':
main()