forked from ask/celery
-
Notifications
You must be signed in to change notification settings - Fork 1
/
testconn.py
120 lines (105 loc) · 3.35 KB
/
testconn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import settings
from django.core.management import setup_environ
setup_environ(settings)
from carrot.connection import DjangoAMQPConnection
from carrot.messaging import Messaging
from amqplib import client_0_8 as amqp
from celery.task import dmap
import operator
import simplejson
import time
import multiprocessing
import logging
def get_logger():
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
multiprocessing.log_to_stderr()
return logger
class MyMessager(Messaging):
queue = "conntest"
exchange = "conntest"
routing_key = "conntest"
def _create_conn():
from django.conf import settings
conn = amqp.Connection(host=settings.AMQP_SERVER,
userid=settings.AMQP_USER,
password=settings.AMQP_PASSWORD,
virtual_host=settings.AMQP_VHOST,
insist=False)
return conn
def _send2(msg):
conn = _create_conn()
channel = conn.channel()
msg = amqp.Message(msg)
msg.properties["delivery_mode"] = 2
channel.basic_publish(msg, exchange="conntest", routing_key="conntest")
conn.close()
def _recv2():
conn = _create_conn()
channel = conn.channel()
channel.queue_declare(queue="conntest", durable=True, exclusive=False,
auto_delete=False)
channel.exchange_declare(exchange="conntest", type="direct",
durable=True, auto_delete=False)
channel.queue_bind(queue="conntest", exchange="conntest",
routing_key="conntest")
m = channel.basic_get("conntest")
if m:
channel.basic_ack(m.delivery_tag)
print("RECEIVED MSG: %s" % m.body)
conn.close()
def send_a_message(msg):
conn = DjangoAMQPConnection()
MyMessager(connection=conn).send({"message": msg})
conn.close()
def discard_all():
conn = DjangoAMQPConnection()
MyMessager(connection=conn).consumer.discard_all()
conn.close()
def receive_a_message():
logger = get_logger()
conn = DjangoAMQPConnection()
m = MyMessager(connection=conn).fetch()
if m:
msg = simplejson.loads(m.body)
logger.info("Message receieved: %s" % msg.get("message"))
m.ack()
conn.close()
def connection_stress_test():
message_count = 0
discard_all()
while True:
send_a_message("FOOBARBAZ!!!")
time.sleep(0.1)
receive_a_message()
message_count += 1
print("Sent %d message(s)" % message_count)
import multiprocessing
def connection_stress_test_mp():
message_count = 0
pool = multiprocessing.Pool(10)
discard_all()
while True:
pool.apply(send_a_message, ["FOOBARBAZ!!!"])
time.sleep(0.1)
r = pool.apply(receive_a_message)
message_count += 1
print("Sent %d message(s)" % message_count)
def connection_stress_test2():
message_count = 0
while True:
_send2("FOOBARBAZ!!!")
time.sleep(0.1)
_recv2()
message_count += 1
print("Sent %d message(s)" % message_count)
def task_stress_test():
task_count = 0
while True:
r = dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
print("[2+2, 4+4, 8+8] = %s" % r)
task_count += 3
print("Executed %d task(s)" % task_count)
if __name__ == "__main__":
#connection_stress_test_mp()
task_stress_test()