# 最快的消息队列zmq的python接口

zmq是linux的准标准之一,号称最快的消息队列,pyzmq是它的python语言接口.

消息队列是在消息的传输过程中保存消息的容器。而zmq看起来却和保存扯不上关系,首先他是去中心化的,没有一个中间人实现调度和信息保存.这点就和kafka,redis差别巨大,而更奇怪的是zmq表面上看就向一个套接字框架,整个代码写下来似乎只有消息传递,似乎就和什么set,save,这些关键字没啥关系.


实际上zmq是典型的面"向模式编程".他将消息存储,多线程以及通讯都做好了封装,而只保留了对其调用的模式.其内在哲学是将所有使用场景抽象为几种模式的组合,通过实现这些模式来实现在这些场景下的应用.

下面将介绍他的基本用法并根据使用场景给出例子

In [1]:
import zmq
print(zmq.pyzmq_version())

16.0.2


## 上下文

In [2]:
import time
context = zmq.Context()

## 套接字

套接字从上下文中创建

zmq的套接字常见使用形式有:

+ 1对1通信(单播通信)
+ 多对一(多客户端单服务器)
+ 单对多(组播)

In [3]:
socket = context.socket(zmq.REP)

## 消息模式

> 点对点通信:


zmq支持点对点通信模式,它的特点是:

+ 双向通信
+ 套接字无状态
+ 只能有一个被连接的端
+ 服务器监听一个特定的端口,并且自由一个客户端连接它


### 同步版

服务器:

In [11]:
%%writefile codes/C_zmq/pairServer.py
import zmq
import random
import sys
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
addr = "tcp://0.0.0.0:{port}".format(port=port) 
socket.bind(addr)
print("sever is running on {addr}".format(addr=addr))
while True:
    socket.send(b"Server message to client")
    msg = socket.recv()
    print(msg)
    time.sleep(1)

Overwriting codes/C_zmq/pairServer.py


客户端:

In [12]:
%%writefile codes/C_zmq/pairClient.py
import zmq
import random
import sys
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

while True:
    msg = socket.recv()
    print(msg)
    socket.send(b"client message to server")
    socket.send(b"client message to server")
    time.sleep(1)

Overwriting codes/C_zmq/pairClient.py


用法:

    python pairServer.py
    python pairClient.py

### 异步版

异步版本可以使用aiozmq来实现,虽然官方有asyncio支持,但必须使用zmq自己定义的loop,无法与多数用uvloop的框架集成.

服务器:

In [19]:
%%writefile codes/aio_zmq/pairServer.py

import asyncio
import aiozmq
import zmq

port = "5556"
addr = "tcp://0.0.0.0:{port}".format(port=port)
async def main():
    server = await aiozmq.create_zmq_stream(
        zmq.PAIR,
        bind=addr)
    print(list(server.transport.bindings())[0])
    while True:
        server.write("Server message to client".encode("utf-8").split())
        try:
            data = await server.read()
        except asyncio.CancelledError:
            break
        print("Async server read: {data}".format(data=data))
        asyncio.sleep(1)
    server.close()

if __name__=="__main__":
    print("sever is running on {addr}".format(addr=addr))
    asyncio.get_event_loop().run_until_complete(main())


Overwriting codes/aio_zmq/pairServer.py


客户端:

In [20]:
%%writefile codes/aio_zmq/pairClient.py
import asyncio
import aiozmq
import zmq
import time
port = "5556"
addr = "tcp://0.0.0.0:{port}".format(port=port)
#addr = "tcp://0.0.0.0:*"
async def main():
    client = await aiozmq.create_zmq_stream(zmq.PAIR)
    
    await client.transport.connect(addr)
    #await asyncio.sleep(1)
    msg = await  client.read()
    print("Async client read: {msg}".format(msg=msg))
    server.write("client message to server".encode("utf-8").split())
    server.write("client message to server".encode("utf-8").split())
    asyncio.sleep(1)
    client.close()

if __name__=="__main__":
    asyncio.get_event_loop().run_until_complete(main())

Overwriting codes/aio_zmq/pairClient.py


> C/S模型(请求响应模型)

![pair](imgs/C_zmq/c-s.png)

最常见的消息模式就是客户端服务器模式了,zmq使用`zmq.REQ`作为请求端的标志,使用`zmq.REP`来作为接收端的标志

服务器:

In [9]:
%%writefile codes/C_zmq/reqrep_server.py
import zmq
import time
import sys

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)
    
context = zmq.Context()
socket = context.socket(zmq.REP)

socket.bind("tcp://*:%s" % port)

while True:
    #  Wait for next request from client
    message = socket.recv()
    print "Received request: ", message
    time.sleep (1)  
    socket.send("World from %s" % port)

Overwriting codes/C_zmq/reqrep_server.py


客户端:

In [10]:
%%writefile codes/C_zmq/reqrep_client.py

import zmq
import sys

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

if len(sys.argv) > 2:
    port1 =  sys.argv[2]
    int(port1)
    
context = zmq.Context()
print "Connecting to server..."
socket = context.socket(zmq.REQ)
socket.connect ("tcp://localhost:%s" % port)
if len(sys.argv) > 2:
    socket.connect ("tcp://localhost:%s" % port1)

#  Do 10 requests, waiting each time for a response
for request in range (1,10):
    print "Sending request ", request,"..."
    socket.send ("Hello")
    #  Get the reply.
    message = socket.recv()
    print "Received reply ", request, "[", message, "]"

Writing codes/C_zmq/reqrep_client.py


    python reqrep_server.py 5546
    python reqrep_server.py 5556
    python reqrep_client.py 5546 5556

> 广播模型

![pair](imgs/C_zmq/Pub-Sub.png)

广播模式也是常见模式之一,消息推送就是最典型的应用下面的例子广播服务器将随机的将9999到100005之间的一个整数作为标题,1到215间的一个随机数-80作为消息数据,并将其用空格分隔作为广播内容

而接受者接收广播内容,用
    socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
   
来过滤topic,最后接收5个这个topic的数据计算出均值

广播服务器:

In [56]:
%%writefile codes/C_zmq/pub_server.py

import zmq
import random
import sys
import time

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
    topic = random.randrange(9999,10005)
    messagedata = random.randrange(1,215) - 80
    print "%d %d" % (topic, messagedata)
    socket.send("%d %d" % (topic, messagedata))
    time.sleep(1)

Writing codes/C_zmq/pub_server.py


收听客户端:

In [58]:
%%writefile codes/C_zmq/sub_client.py
import sys
import zmq

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)
    
if len(sys.argv) > 2:
    port1 =  sys.argv[2]
    int(port1)

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

if len(sys.argv) > 2:
    socket.connect ("tcp://localhost:%s" % port1)
    
# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)#设置可以被接收的

# Process 5 updates
total_value = 0
for update_nbr in range (5):
    string = socket.recv()
    topic, messagedata = string.split()
    total_value += int(messagedata)
    print topic, messagedata

print "Average messagedata value for topic '%s' was %dF" % (topicfilter, total_value / update_nbr)
      

Writing codes/C_zmq/sub_client.py


    python pub_server.py 5556
    python pub_server.py 5546
    python sub_client.py 5556 5546

> 生产者消费者模型

![pair](imgs/C_zmq/p-c.png)

生产者消费者模式也是很常见的模型,下面的例子是一个生产者消费者的例子

生产者生产1到20000作为数据每次发送一个去消费者那边

消费者跳出其中的偶数发送给最后的收集者

收集者则计数各个消费者发送来了几次

生产者:

In [59]:
%%writefile codes/C_zmq/producer.py

import time
import zmq

def producer():
    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUSH)
    zmq_socket.bind("tcp://127.0.0.1:5557")
    # Start your result manager and workers before you start your producers
    for num in xrange(20000):
        work_message = { 'num' : num }
        zmq_socket.send_json(work_message)

producer()

Writing codes/C_zmq/producer.py


消费者:

In [60]:
%%writefile codes/C_zmq/consumer.py
import time
import zmq
import random

def consumer():
    consumer_id = random.randrange(1,10005)
    print "I am consumer #%s" % (consumer_id)
    context = zmq.Context()
    # recieve work
    consumer_receiver = context.socket(zmq.PULL)
    consumer_receiver.connect("tcp://127.0.0.1:5557")
    # send work
    consumer_sender = context.socket(zmq.PUSH)
    consumer_sender.connect("tcp://127.0.0.1:5558")
    
    while True:
        work = consumer_receiver.recv_json()
        data = work['num']
        result = { 'consumer' : consumer_id, 'num' : data}
        if data%2 == 0: 
            consumer_sender.send_json(result)

consumer()

Writing codes/C_zmq/consumer.py


收集者:

In [61]:
%%writefile codes/C_zmq/resultcollector.py

import time
import zmq
import pprint

def result_collector():
    context = zmq.Context()
    results_receiver = context.socket(zmq.PULL)
    results_receiver.bind("tcp://127.0.0.1:5558")
    collecter_data = {}
    for x in xrange(1000):
        result = results_receiver.recv_json()
        if collecter_data.has_key(result['consumer']):
            collecter_data[result['consumer']] = collecter_data[result['consumer']] + 1
        else:
            collecter_data[result['consumer']] = 1
        if x == 999:
            pprint.pprint(collecter_data)

result_collector()

Writing codes/C_zmq/resultcollector.py


    python resultcollector.py
    python consumer.py
    python consumer.py
    python producer.py


## 中间人装置

在单对单模式中两端都是固定的,而实际使用中我们往往希望客户端与服务器都是可拆卸的,这种时候我们就需要ZMQ的装置了

![pair](imgs/C_zmq/devices.png)

zmq常用的装置有:

+ Queue
+ Forwarder
+ Streamer

> Queue

zmq使用ZMQ.QUEUE指定使用QUEUE作为中间人装置,它适用于客户端服务器结构

![pair](imgs/C_zmq/queue.png)

中间人:

In [71]:
%%writefile codes/C_zmq/queue_device.py

import zmq

def main():

    try:
        context = zmq.Context(1)
        # Socket facing clients
        frontend = context.socket(zmq.XREP)
        frontend.bind("tcp://*:5559")
        # Socket facing services
        backend = context.socket(zmq.XREQ)
        backend.bind("tcp://*:5560")

        zmq.device(zmq.QUEUE, frontend, backend)
    except Exception, e:
        print e
        print "bringing down zmq device"
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == "__main__":
    main()

Writing codes/C_zmq/queue_device.py


客户端:

In [72]:
%%writefile codes/C_zmq/queue_client.py
import zmq
import sys
import random

port = "5559"
context = zmq.Context()
print "Connecting to server..."
socket = context.socket(zmq.REQ)
socket.connect ("tcp://localhost:%s" % port)
client_id = random.randrange(1,10005)
#  Do 10 requests, waiting each time for a response
for request in range (1,10):
    print "Sending request ", request,"..."    
    socket.send ("Hello from %s" % client_id)
    #  Get the reply.
    message = socket.recv()
    print "Received reply ", request, "[", message, "]"

Writing codes/C_zmq/queue_client.py


服务器:

In [73]:
%%writefile codes/C_zmq/queue_server.py
import zmq
import time
import sys
import random

port = "5560"
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:%s" % port)
server_id = random.randrange(1,10005)
while True:
    #  Wait for next request from client
    message = socket.recv()
    print "Received request: ", message
    time.sleep (1)  
    socket.send("World from server %s" % server_id)

Writing codes/C_zmq/queue_server.py


    python queue_device.py
    python queue_server.py
    python queue_server.py
    python queue_client.py
    python queue_client.py

> Forwarder

Forwarder是作为广播的中间人工具的

![pair](imgs/C_zmq/forwarder.png)


中间人

In [74]:
%%writefile codes/C_zmq/forwarder_device.py

import zmq

def main():

    try:
        context = zmq.Context(1)
        # Socket facing clients
        frontend = context.socket(zmq.SUB)
        frontend.bind("tcp://*:5559")
        
        frontend.setsockopt(zmq.SUBSCRIBE, "")
        
        # Socket facing services
        backend = context.socket(zmq.PUB)
        backend.bind("tcp://*:5560")

        zmq.device(zmq.FORWARDER, frontend, backend)
    except Exception, e:
        print e
        print "bringing down zmq device"
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == "__main__":
    main()

Writing codes/C_zmq/forwarder_device.py


广播服务器

In [75]:
%%writefile codes/C_zmq/forwarder_server.py

import zmq
import random
import sys
import time

port = "5559"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:%s" % port)
publisher_id = random.randrange(0,9999)
while True:
    topic = random.randrange(1,10)
    messagedata = "server#%s" % publisher_id
    print "%s %s" % (topic, messagedata)
    socket.send("%d %s" % (topic, messagedata))
    time.sleep(1)

Writing codes/C_zmq/forwarder_server.py


收听客户端

In [76]:
%%writefile codes/C_zmq/forwarder_subscriber.py

import sys
import zmq

port = "5560"
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Collecting updates from server..."
socket.connect ("tcp://localhost:%s" % port)
topicfilter = "9"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
for update_nbr in range(10):
    string = socket.recv()
    topic, messagedata = string.split()
    print topic, messagedata


Writing codes/C_zmq/forwarder_subscriber.py


    python forwarder_device.py
    python forwarder_subscriber.py
    python forwarder_server.py
    python forwarder_server.py


> Streamer

Streamer是给生产者消费者模式用的中间人组件,通过它可以制作一个简单的分布式运算系统

![pair](imgs/C_zmq/Streamer.png)

中间人

In [77]:
%%writefile codes/C_zmq/streamer_device.py

import zmq

def main():

    try:
        context = zmq.Context(1)
        # Socket facing clients
        frontend = context.socket(zmq.PULL)
        frontend.bind("tcp://*:5559")
        
        # Socket facing services
        backend = context.socket(zmq.PUSH)
        backend.bind("tcp://*:5560")

        zmq.device(zmq.STREAMER, frontend, backend)
    except Exception, e:
        print e
        print "bringing down zmq device"
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == "__main__":
    main()

Writing codes/C_zmq/streamer_device.py


生产者

In [78]:
%%writefile codes/C_zmq/task_feeder.py

import time
import zmq

def producer():
    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUSH)
    zmq_socket.connect("tcp://127.0.0.1:5559")
    # Start your result manager and workers before you start your producers
    for num in xrange(20000):
        work_message = { 'num' : num }
        zmq_socket.send_json(work_message)
        time.sleep(1)


Writing codes/C_zmq/task_feeder.py


In [79]:
%%writefile codes/C_zmq/task_worker.py

import sys
import time
import zmq
import random

def consumer():
    consumer_id = random.randrange(1,10005)
    print "I am consumer #%s" % (consumer_id)
    context = zmq.Context()
    # recieve work
    consumer_receiver = context.socket(zmq.PULL)
    consumer_receiver.connect("tcp://127.0.0.1:5560")
    while True:
        work = consumer_receiver.recv_json()
        data = work['num']
        result = { 'consumer' : consumer_id, 'num' : data}
        print result
consumer()

Writing codes/C_zmq/task_worker.py


    python streamer_device.py
    python task_feeder.py
    python task_worker.py
    python task_worker.py


# 用法示例

## 用子进程结合pyzmq

我们可以通过子进程来实现客户端服务器放在一个文件中,这种方式在做一些桌面工具时非常有用,是拆分前后端的方法之一


In [80]:
%%writefile codes/C_zmq/request_reply_processes.py

import zmq
import time
import sys
from  multiprocessing import Process

def server(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:%s" % port)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(5):
        # Wait for next request from client
        message = socket.recv()
        print "Received request #%s: %s" % (reqnum, message)
        socket.send("World from %s" % port)
         
def client(ports=["5556"]):

    context = zmq.Context()
    print "Connecting to server with ports %s" % ports
    socket = context.socket(zmq.REQ)
    for port in ports:
        socket.connect ("tcp://localhost:%s" % port)
    for request in range (20):
        print "Sending request ", request,"..."
        socket.send ("Hello")
        message = socket.recv()
        print "Received reply ", request, "[", message, "]"
        time.sleep (1) 


if __name__ == "__main__":
    # Now we can run a few servers 
    server_ports = range(5550,5558,2)
    for server_port in server_ports:
        Process(target=server, args=(server_port,)).start()
        
    # Now we can connect a client to all these servers
    Process(target=client, args=(server_ports,)).start()

Writing codes/C_zmq/request_reply_processes.py


## 轮训器

下面的例子是一个轮训器的例子,使用的是广播模式结合生产消费模式,代码如下

In [82]:
%%writefile codes/C_zmq/zmqpolling.py

import zmq
import time
import sys
import random
from  multiprocessing import Process

def server_push(port="5556"):
    """
    生产者,负责生产消息,端口5556
    """
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:%s" % port)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            break
        time.sleep (1) 
        
def server_pub(port="5558"):
    """广播服务:一直发送8到10间的一个数作为topic,publisher_id作为消息,发送10次
    端口5558
    """
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(1)    
         
def client(port_push, port_sub):
    """客户端,轮询这两个服务器端口,不断打印广播的信息,如果消费者接受的message等于"exit"就不再轮询
    """
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://localhost:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://localhost:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)
    
    should_continue = True
    while should_continue:
        socks = dict(poller.poll())
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata
     
    
if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    Process(target=client, args=(server_push_port,server_pub_port,)).start()

Overwriting codes/C_zmq/zmqpolling.py


## Streamer示例

用streamer可以配合多进程实现server/worker的配合

In [1]:
%%writefile codes/C_zmq/streamerdevice.py

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process

frontend_port = 5559
backend_port = 5560
number_of_workers = 2

streamerdevice  = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)

streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
streamerdevice.setsockopt_in(zmq.IDENTITY, 'PULL')
streamerdevice.setsockopt_out(zmq.IDENTITY, 'PUSH')

streamerdevice.start()

def server():
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.connect("tcp://127.0.0.1:%d" % frontend_port)

    for i in xrange(0,10):
        socket.send('#%s' % i)
def worker(work_num):
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://127.0.0.1:%d" % backend_port)
    
    while True:
        message = socket.recv()
        print "Worker #%s got message! %s" % (work_num, message)
        time.sleep(1)

for work_num in range(number_of_workers):
    Process(target=worker, args=(work_num,)).start()
time.sleep(1)

server()

Writing codes/C_zmq/streamerdevice.py


## 队列

使用队列与多进程

In [2]:
%%writefile codes/C_zmq/queuedevice.py

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
import random

frontend_port = 5559
backend_port = 5560
number_of_workers = 2

queuedevice = ProcessDevice(zmq.QUEUE, zmq.XREP, zmq.XREQ)
queuedevice.bind_in("tcp://127.0.0.1:%d" % frontend_port)
queuedevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
queuedevice.setsockopt_in(zmq.HWM, 1)
queuedevice.setsockopt_out(zmq.HWM, 1)
queuedevice.start()
time.sleep (2)  

def server(backend_port):
    print "Connecting a server to queue device"
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect("tcp://127.0.0.1:%s" % backend_port)
    server_id = random.randrange(1,10005)
    while True:
        message = socket.recv()
        print "Received request: ", message  
        socket.send("Response from %s" % server_id)
        
        
def client(frontend_port, client_id):
    print "Connecting a worker #%s to queue device" % client_id
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://127.0.0.1:%s" % frontend_port)
    #  Do 10 requests, waiting each time for a response
    for request in range (1,5):
        print "Sending request #%s" % request
        socket.send ("Request fron client: %s" % client_id)
        #  Get the reply.
        message = socket.recv()
        print "Received reply ", request, "[", message, "]"
        
        
Process(target=server, args=(backend_port,)).start()  

time.sleep(2)
    
for client_id in range(number_of_workers):
    Process(target=client, args=(frontend_port, client_id,)).start()

Writing codes/C_zmq/queuedevice.py


## 可监控的队列

In [3]:
%%writefile codes/C_zmq/monitoredqueue.py

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from zmq.devices.monitoredqueuedevice import MonitoredQueue
from zmq.utils.strtypes import asbytes
from multiprocessing import Process
import random

frontend_port = 5559
backend_port = 5560
monitor_port = 5562
number_of_workers = 2

def monitordevice():
    in_prefix=asbytes('in')
    out_prefix=asbytes('out')
    monitoringdevice = MonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, in_prefix, out_prefix)
    
    monitoringdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port)
    monitoringdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
    monitoringdevice.bind_mon("tcp://127.0.0.1:%d" % monitor_port)
    
    monitoringdevice.setsockopt_in(zmq.HWM, 1)
    monitoringdevice.setsockopt_out(zmq.HWM, 1)
    monitoringdevice.start()  
    print "Program: Monitoring device has started"
    
def server(backend_port):
    print "Program: Server connecting to device"
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect("tcp://127.0.0.1:%s" % backend_port)
    server_id = random.randrange(1,10005)
    while True:
        message = socket.recv()
        print "Server: Received - %s" % message  
        socket.send("Response from server #%s" % server_id)
        
def client(frontend_port, client_id):
    print "Program: Worker #%s connecting to device" % client_id
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://127.0.0.1:%s" % frontend_port)
    request_num = 1
    socket.send ("Request #%s from client#%s" % (request_num, client_id))
    #  Get the reply.
    message = socket.recv_multipart()
    print "Client: Received - %s" % message
    
def monitor():
    print "Starting monitoring process"
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    print "Collecting updates from server..."
    socket.connect ("tcp://127.0.0.1:%s" % monitor_port)
    socket.setsockopt(zmq.SUBSCRIBE, "")
    while True:
        string = socket.recv_multipart()
        print "Monitoring Client: %s" % string
        
        
monitoring_p = Process(target=monitordevice)
monitoring_p.start()  
server_p = Process(target=server, args=(backend_port,))
server_p.start()  
monitorclient_p = Process(target=monitor)
monitorclient_p.start()  
time.sleep(2)   

for client_id in range(number_of_workers):
    Process(target=client, args=(frontend_port, client_id,)).start()

time.sleep(10)
server_p.terminate()
monitorclient_p.terminate()
monitoring_p.terminate()


Writing codes/C_zmq/monitoredqueue.py
