File tree Expand file tree Collapse file tree 2 files changed +86
-0
lines changed
Expand file tree Collapse file tree 2 files changed +86
-0
lines changed Original file line number Diff line number Diff line change 1+ #!/usr/bin/env php
2+ <?php
3+ /**
4+ @author Chimdi Azubuike <me@chimdi.com>
5+ */
6+
7+ //Establish connection to AMQP
8+ $ connection = new AMQPConnection ();
9+ $ connection ->setHost ('127.0.0.1 ' );
10+ $ connection ->setLogin ('guest ' );
11+ $ connection ->setPassword ('guest ' );
12+ $ connection ->connect ();
13+
14+
15+
16+ //Create and declare channel
17+ $ channel = new AMQPChannel ($ connection );
18+ $ channel ->setPrefetchCount (1 );
19+
20+
21+ $ routing_key = 'task_queue ' ;
22+
23+ try {
24+ $ queue = new AMQPQueue ($ channel );
25+ $ queue ->setName ($ routing_key );
26+ $ queue ->setFlags (AMQP_DURABLE );
27+ $ queue ->declareQueue ();
28+
29+ }catch (Exception $ ex ){
30+ print_r ($ ex );
31+ }
32+
33+
34+ //Read from stdin
35+ $ data = implode (' ' , array_slice ($ argv ,1 ));
36+ if (empty ($ data ))
37+ $ data = "Hello World! " ;
38+
39+ $ exchange = new AMQPExchange ($ channel );
40+ $ exchange ->publish ($ data , $ routing_key );
41+
42+ echo " [x] Sent {$ data }" , PHP_EOL ;
43+
44+ $ connection ->disconnect ();
45+
Original file line number Diff line number Diff line change 1+ #!/usr/bin/env php
2+ <?php
3+ /**
4+ @author Chimdi Azubuike <me@chimdi.com>
5+ */
6+
7+ //Establish connection AMQP
8+ $ connection = new AMQPConnection ();
9+ $ connection ->setHost ('127.0.0.1 ' );
10+ $ connection ->setLogin ('guest ' );
11+ $ connection ->setPassword ('guest ' );
12+ $ connection ->connect ();
13+
14+ //Create and declare channel
15+ $ channel = new AMQPChannel ($ connection );
16+
17+ $ routing_key = 'task_queue ' ;
18+
19+ $ callback_func = function (AMQPEnvelope $ message , AMQPQueue $ q ) use (&$ max_jobs ) {
20+ echo " [x] Received: " , $ message ->getBody (), PHP_EOL ;
21+ sleep (1 );
22+ echo " [X] Done " , PHP_EOL ;
23+ $ q ->ack ($ message ->getDeliveryTag ());
24+ };
25+
26+ try {
27+ $ queue = new AMQPQueue ($ channel );
28+ $ queue ->setName ($ routing_key );
29+ $ queue ->setFlags (AMQP_DURABLE );
30+ $ queue ->declareQueue ();
31+
32+
33+ echo ' [*] Waiting for logs. To exit press CTRL+C ' , PHP_EOL ;
34+ $ queue ->consume ($ callback_func );
35+ }catch (AMQPQueueException $ ex ){
36+ print_r ($ ex );
37+ }catch (Exception $ ex ){
38+ print_r ($ ex );
39+ }
40+
41+ $ connection ->disconnect ();
You can’t perform that action at this time.
0 commit comments