-
Notifications
You must be signed in to change notification settings - Fork 1
/
message.py
162 lines (119 loc) · 4.61 KB
/
message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# -*- coding: utf-8 -*-
"""Tests messaging with basic_get"""
from five import grok
from zope import schema
from zope.interface import Interface
from zope.component import getUtility, queryUtility
from z3c.form import button
from plone.directives import form
from Products.CMFCore.interfaces import ISiteRoot
from Products.statusmessages.interfaces import IStatusMessage
from collective.zamqp.interfaces import IProducer, IConsumer
from collective.zamqp.interfaces import IMessageArrivedEvent
from collective.zamqp.producer import Producer
from collective.zamqp.consumer import Consumer
from collective.zamqp.connection import BlockingChannel
from zope.i18nmessageid import MessageFactory as ZopeMessageFactory
_ = ZopeMessageFactory("collective.zamqpdemo")
import logging
logger = logging.getLogger("collective.zamqpdemo")
class IMessage(Interface):
"""Message marker interface"""
class QueueMessageProducer(Producer):
"""Produces queue message command"""
grok.name("amqpdemo.messages")
connection_id = "superuser"
exchange = "awaiting"
serializer = "plain"
auto_delete = False
durable = False
@property
def routing_key(self):
site = queryUtility(ISiteRoot)
if site:
return "amqpdemo.%s.messages" % site.getId()
else:
return "amqpdemo.${site}.messages"
class AwaitingMessages(Consumer):
"""Consumes purge-requests"""
grok.name("amqpdemo.${site_id}.awaiting") # is also the queue name
connection_id = "superuser"
exchange = "awaiting"
queue_arguments = {
"x-dead-letter-exchange": "messages", # redirect messages with reject
}
routing_key = "amqpdemo.${site_id}.messages"
marker = IMessage
auto_delete = False
durable = False
def on_ready_to_consume(self):
pass # overrided to disable auto-consuming
# we need this consumer to declare a site-specific queue
class MessageConsumer(Consumer):
"""Consumes purge-requests"""
grok.name("amqpdemo.${site_id}.messages") # is also the queue name
connection_id = "superuser"
exchange = "messages"
queue = "" # use generated queue name
routing_key = "amqpdemo.${site_id}.messages"
marker = IMessage
durable = False
class IQueueForm(form.Schema):
"""Status form schema"""
message = schema.Text(
title=_(u"Message")
)
class MessageForm(form.SchemaForm):
"""Message form"""
grok.name("queue-message")
grok.context(Interface)
schema = IQueueForm
ignoreContext = True
label = _(u"Queue Message")
description = _(u"Tests 'basic.get'.")
def update(self):
self.request.set("disable_border", True)
super(MessageForm, self).update()
@button.buttonAndHandler(_(u"Send"))
def queueMessage(self, action):
data, errors = self.extractData()
producer = getUtility(IProducer, name="amqpdemo.messages")
producer._register() # register for transaction
producer.publish(data["message"])
IStatusMessage(self.request).addStatusMessage(
u"Queued: %s" % data["message"],
"info")
class PurgeView(grok.View):
"""Purge queue view"""
grok.name("purge-queue")
grok.context(Interface)
def update(self):
site = getUtility(ISiteRoot)
queue = "amqpdemo.%s.awaiting" % site.getId()
messages = len(getUtility(IConsumer, name=queue))
if messages:
with BlockingChannel("demo") as channel:
method, properties, body = channel.basic_get(queue=queue)
if properties and body: # quick test for Basic.GetOk
channel.basic_reject(delivery_tag=method.delivery_tag,
requeue=False)
IStatusMessage(self.request).addStatusMessage(
u"Purge requested",
"info")
self.request.response.redirect(self.context.absolute_url())
def render(self):
return u""
@grok.subscribe(IMessage, IMessageArrivedEvent)
def logMessage(message, event):
logger.info("Purged: %s" % message.body)
site = getUtility(ISiteRoot)
queue = "amqpdemo.%s.awaiting" % site.getId()
messages = len(getUtility(IConsumer, name=queue))
logger.info("Messages left: %s", messages)
if messages:
with BlockingChannel("demo") as channel:
method, properties, body = channel.basic_get(queue=queue)
if properties and body: # quick test for Basic.GetOk
channel.basic_reject(delivery_tag=method.delivery_tag,
requeue=False)
message.ack()