From 23e94c5b679f008d85e11fbc05941c86c9a505a1 Mon Sep 17 00:00:00 2001 From: cooldaemon Date: Wed, 9 Dec 2009 20:04:06 +0900 Subject: [PATCH] First commit. --- .gitignore | 8 + .shipit | 2 + Changes | 4 + MANIFEST.SKIP | 19 + Makefile.PL | 21 + README | 27 + amqp0-8.xml | 3908 +++++++++++++++++ config.json | 7 + lib/RabbitFoot.pm | 657 +++ lib/RabbitFoot/Cmd.pm | 9 + lib/RabbitFoot/Cmd/Command/bind_queue.pm | 74 + .../Cmd/Command/declare_exchange.pm | 98 + lib/RabbitFoot/Cmd/Command/declare_queue.pm | 82 + lib/RabbitFoot/Cmd/Command/delete_exchange.pm | 51 + lib/RabbitFoot/Cmd/Command/delete_queue.pm | 61 + lib/RabbitFoot/Cmd/Command/purge_queue.pm | 41 + lib/RabbitFoot/Cmd/Role/Command.pm | 131 + lib/RabbitFoot/Cmd/Role/Config.pm | 35 + rabbit_foot | 11 + t/00_compile.t | 13 + xt/01_podspell.t | 13 + xt/02_perlcritic.t | 8 + xt/03_pod.t | 4 + xt/04_use_server.t | 92 + xt/perlcriticrc | 8 + 25 files changed, 5384 insertions(+) create mode 100644 .gitignore create mode 100644 .shipit create mode 100644 Changes create mode 100644 MANIFEST.SKIP create mode 100644 Makefile.PL create mode 100644 README create mode 100644 amqp0-8.xml create mode 100644 config.json create mode 100644 lib/RabbitFoot.pm create mode 100644 lib/RabbitFoot/Cmd.pm create mode 100644 lib/RabbitFoot/Cmd/Command/bind_queue.pm create mode 100644 lib/RabbitFoot/Cmd/Command/declare_exchange.pm create mode 100644 lib/RabbitFoot/Cmd/Command/declare_queue.pm create mode 100644 lib/RabbitFoot/Cmd/Command/delete_exchange.pm create mode 100644 lib/RabbitFoot/Cmd/Command/delete_queue.pm create mode 100644 lib/RabbitFoot/Cmd/Command/purge_queue.pm create mode 100644 lib/RabbitFoot/Cmd/Role/Command.pm create mode 100644 lib/RabbitFoot/Cmd/Role/Config.pm create mode 100755 rabbit_foot create mode 100644 t/00_compile.t create mode 100644 xt/01_podspell.t create mode 100644 xt/02_perlcritic.t create mode 100644 xt/03_pod.t create mode 100644 xt/04_use_server.t create mode 100644 xt/perlcriticrc diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bfd20cf --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +cover_db +META.yml +Makefile +blib +inc +pm_to_blib +MANIFEST +Makefile.old diff --git a/.shipit b/.shipit new file mode 100644 index 0000000..dba08a4 --- /dev/null +++ b/.shipit @@ -0,0 +1,2 @@ +steps = FindVersion, ChangeVersion, CheckChangeLog, DistTest, Commit, Tag, MakeDist +svk.tagpattern = release-%v diff --git a/Changes b/Changes new file mode 100644 index 0000000..966db6d --- /dev/null +++ b/Changes @@ -0,0 +1,4 @@ +Revision history for Perl extension RabbitFoot + +0.01 Sun Dec 6 20:54:03 2009 + - original version diff --git a/MANIFEST.SKIP b/MANIFEST.SKIP new file mode 100644 index 0000000..27a635e --- /dev/null +++ b/MANIFEST.SKIP @@ -0,0 +1,19 @@ +\bRCS\b +\bCVS\b +^MANIFEST\. +^Makefile$ +~$ +^# +\.old$ +^blib/ +^pm_to_blib +^MakeMaker-\d +\.gz$ +\.cvsignore +^t/9\d_.*\.t +^t/perlcritic +^tools/ +\.svn/ +^[^/]+\.yaml$ +^[^/]+\.pl$ +^\.shipit$ diff --git a/Makefile.PL b/Makefile.PL new file mode 100644 index 0000000..36a390d --- /dev/null +++ b/Makefile.PL @@ -0,0 +1,21 @@ +use inc::Module::Install; +name 'RabbitFoot'; +all_from 'lib/RabbitFoot.pm'; + +requires 'Moose'; +requires 'MooseX::App::Cmd'; +requires 'MooseX::ConfigFromFile'; +requires 'Config::Any'; +requires 'JSON::Syck'; +requires 'List::MoreUtils'; +requires 'Sys::SigAction'; +requires 'Net::AMQP'; + +tests 't/*.t'; +author_tests 'xt'; + +build_requires 'Test::More'; +build_requires 'Test::Exception'; +use_test_base; +auto_include; +WriteAll; diff --git a/README b/README new file mode 100644 index 0000000..22fea9c --- /dev/null +++ b/README @@ -0,0 +1,27 @@ +This is Perl module RabbitFoot. + +INSTALLATION + +RabbitFoot installation is straightforward. If your CPAN shell is set up, +you should just be able to do + + % cpan RabbitFoot + +Download it, unpack it, then build it as per the usual: + + % perl Makefile.PL + % make && make test + +Then install it: + + % make install + +DOCUMENTATION + +RabbitFoot documentation is available as in POD. So you can do: + + % perldoc RabbitFoot + +to read the documentation online with your favorite pager. + +Masahito Ikuta diff --git a/amqp0-8.xml b/amqp0-8.xml new file mode 100644 index 0000000..8bc1221 --- /dev/null +++ b/amqp0-8.xml @@ -0,0 +1,3908 @@ + + + + + + AMQ Protocol 0.80 + + + + + + + + + + + + + Indicates that the method completed successfully. This reply code is + reserved for future use - the current protocol design does not use + positive confirmation and reply codes are sent only in case of an + error. + + + The client asked for a specific message that is no longer available. + The message was delivered to another client, or was purged from the + queue for some other reason. + + + The client attempted to transfer content larger than the server + could accept at the present time. The client may retry at a later + time. + + + An operator intervened to close the connection for some reason. + The client may retry at some later date. + + + The client tried to work with an unknown virtual host or cluster. + + + The client attempted to work with a server entity to which it has + no due to security settings. + + + The client attempted to work with a server entity that does not exist. + + + The client attempted to work with a server entity to which it has + no access because another client is working with it. + + + The client sent a malformed frame that the server could not decode. + This strongly implies a programming error in the client. + + + The client sent a frame that contained illegal values for one or more + fields. This strongly implies a programming error in the client. + + + The client sent an invalid sequence of frames, attempting to perform + an operation that was considered invalid by the server. This usually + implies a programming error in the client. + + + The client attempted to work with a channel that had not been + correctly opened. This most likely indicates a fault in the client + layer. + + + The server could not complete the method because it lacked sufficient + resources. This may be due to the client creating too many of some + type of entity. + + + The client tried to work with some entity in a manner that is + prohibited by the server, due to security settings or by some other + criteria. + + + The client tried to use functionality that is not implemented in the + server. + + + The server could not complete the method because of an internal error. + The server may require intervention by an operator in order to resume + normal operations. + + + + access ticket granted by server + + An access ticket granted by the server for a certain set of access + rights within a specific realm. Access tickets are valid within the + channel where they were created, and expire when the channel closes. + + + + + + consumer tag + + Identifier for the consumer, valid within the current connection. + + + The consumer tag is valid only within the channel from which the + consumer was created. I.e. a client MUST NOT create a consumer in + one channel and then use it in another. + + + + server-assigned delivery tag + + The server-assigned and channel-specific delivery tag + + + The delivery tag is valid only within the channel from which the + message was received. I.e. a client MUST NOT receive a message on + one channel and then acknowledge it on another. + + + The server MUST NOT use a zero value for delivery tags. Zero is + reserved for client use, meaning "all messages so far received". + + + + exchange name + + The exchange name is a client-selected string that identifies + the exchange for publish methods. Exchange names may consist + of any mixture of digits, letters, and underscores. Exchange + names are scoped by the virtual host. + + + + +list of known hosts + +Specifies the list of equivalent or alternative hosts that the server +knows about, which will normally include the current server itself. +Clients can cache this information and use it when reconnecting to a +server after a failure. + + +The server MAY leave this field empty if it knows of no other +hosts than itself. + + + + + no acknowledgement needed + + If this field is set the server does not expect acknowledgments + for messages. That is, when a message is delivered to the client + the server automatically and silently acknowledges it on behalf + of the client. This functionality increases performance but at + the cost of reliability. Messages can get lost if a client dies + before it can deliver them to the application. + + + + do not deliver own messages + + If the no-local field is set the server will not send messages to + the client that published them. + + + + + Must start with a slash "/" and continue with path names + separated by slashes. A path name consists of any combination + of at least one of [A-Za-z0-9] plus zero or more of [.-_+!=:]. + + + + + + + +This string provides a set of peer properties, used for +identification, debugging, and general information. + + +The properties SHOULD contain these fields: +"product", giving the name of the peer product, "version", giving +the name of the peer version, "platform", giving the name of the +operating system, "copyright", if appropriate, and "information", +giving other general information. + + + + queue name + + The queue name identifies the queue within the vhost. Queue + names may consist of any mixture of digits, letters, and + underscores. + + + + + message is being redelivered + + This indicates that the message has been previously delivered to + this or another client. + + + The server SHOULD try to signal redelivered messages when it can. + When redelivering a message that was not successfully acknowledged, + the server SHOULD deliver it to the original client if possible. + + + The client MUST NOT rely on the redelivered field but MUST take it + as a hint that the message may already have been processed. A + fully robust client must be able to track duplicate received messages + on non-transacted, and locally-transacted channels. + + + +reply code from server + + The reply code. The AMQ reply codes are defined in AMQ RFC 011. + + + + +localised reply text + + The localised reply text. This text can be logged as an aid to + resolving issues. + + + + + + work with socket connections + + The connection class provides methods for a client to establish a + network connection to a server, and for both peers to operate the + connection thereafter. + + + connection = open-connection *use-connection close-connection + open-connection = C:protocol-header + S:START C:START-OK + *challenge + S:TUNE C:TUNE-OK + C:OPEN S:OPEN-OK | S:REDIRECT + challenge = S:SECURE C:SECURE-OK + use-connection = *channel + close-connection = C:CLOSE S:CLOSE-OK + / S:CLOSE C:CLOSE-OK + + + + + + start connection negotiation + + This method starts the connection negotiation process by telling + the client the protocol version that the server proposes, along + with a list of security mechanisms which the client can use for + authentication. + + + If the client cannot handle the protocol version suggested by the + server it MUST close the socket connection. + + + The server MUST provide a protocol version that is lower than or + equal to that requested by the client in the protocol header. If + the server cannot support the specified protocol it MUST NOT send + this method, but MUST close the socket connection. + + + + + protocol major version + + The protocol major version that the server agrees to use, which + cannot be higher than the client's major version. + + + + protocol major version + + The protocol minor version that the server agrees to use, which + cannot be higher than the client's minor version. + + + + server properties + + + available security mechanisms + + A list of the security mechanisms that the server supports, delimited + by spaces. Currently ASL supports these mechanisms: PLAIN. + + + + + + available message locales + + A list of the message locales that the server supports, delimited + by spaces. The locale defines the language in which the server + will send reply texts. + + + All servers MUST support at least the en_US locale. + + + + + + select security mechanism and locale + + This method selects a SASL security mechanism. ASL uses SASL + (RFC2222) to negotiate authentication and encryption. + + + + client properties + + + selected security mechanism + + A single security mechanisms selected by the client, which must be + one of those specified by the server. + + + The client SHOULD authenticate using the highest-level security + profile it can handle from the list provided by the server. + + + The mechanism field MUST contain one of the security mechanisms + proposed by the server in the Start method. If it doesn't, the + server MUST close the socket. + + + + + security response data + + A block of opaque data passed to the security mechanism. The contents + of this data are defined by the SASL security mechanism. For the + PLAIN security mechanism this is defined as a field table holding + two fields, LOGIN and PASSWORD. + + + + + selected message locale + + A single message local selected by the client, which must be one + of those specified by the server. + + + + + + + security mechanism challenge + + The SASL protocol works by exchanging challenges and responses until + both peers have received sufficient information to authenticate each + other. This method challenges the client to provide more information. + + + + + security challenge data + + Challenge information, a block of opaque binary data passed to + the security mechanism. + + + + + + security mechanism response + + This method attempts to authenticate, passing a block of SASL data + for the security mechanism at the server side. + + + + security response data + + A block of opaque data passed to the security mechanism. The contents + of this data are defined by the SASL security mechanism. + + + + + + + propose connection tuning parameters + + This method proposes a set of connection configuration values + to the client. The client can accept and/or adjust these. + + + + + proposed maximum channels + + The maximum total number of channels that the server allows + per connection. Zero means that the server does not impose a + fixed limit, but the number of allowed channels may be limited + by available server resources. + + + + proposed maximum frame size + + The largest frame size that the server proposes for the + connection. The client can negotiate a lower value. Zero means + that the server does not impose any specific limit but may reject + very large frames if it cannot allocate resources for them. + + + Until the frame-max has been negotiated, both peers MUST accept + frames of up to 4096 octets large. The minimum non-zero value for + the frame-max field is 4096. + + + + desired heartbeat delay + + The delay, in seconds, of the connection heartbeat that the server + wants. Zero means the server does not want a heartbeat. + + + + + negotiate connection tuning parameters + + This method sends the client's connection tuning parameters to the + server. Certain fields are negotiated, others provide capability + information. + + + + negotiated maximum channels + + The maximum total number of channels that the client will use + per connection. May not be higher than the value specified by + the server. + + + The server MAY ignore the channel-max value or MAY use it for + tuning its resource allocation. + + + + + + negotiated maximum frame size + + The largest frame size that the client and server will use for + the connection. Zero means that the client does not impose any + specific limit but may reject very large frames if it cannot + allocate resources for them. Note that the frame-max limit + applies principally to content frames, where large contents + can be broken into frames of arbitrary size. + + + Until the frame-max has been negotiated, both peers must accept + frames of up to 4096 octets large. The minimum non-zero value for + the frame-max field is 4096. + + + + desired heartbeat delay + + The delay, in seconds, of the connection heartbeat that the client + wants. Zero means the client does not want a heartbeat. + + + + + + open connection to virtual host + + This method opens a connection to a virtual host, which is a + collection of resources, and acts to separate multiple application + domains within a server. + + + The client MUST open the context before doing any work on the + connection. + + + + + + virtual host name + + + The name of the virtual host to work with. + + + If the server supports multiple virtual hosts, it MUST enforce a + full separation of exchanges, queues, and all associated entities + per virtual host. An application, connected to a specific virtual + host, MUST NOT be able to access resources of another virtual host. + + + The server SHOULD verify that the client has permission to access + the specified virtual host. + + + The server MAY configure arbitrary limits per virtual host, such + as the number of each type of entity that may be used, per + connection and/or in total. + + + + required capabilities + + The client may specify a number of capability names, delimited by + spaces. The server can use this string to how to process the + client's connection request. + + + + insist on connecting to server + + In a configuration with multiple load-sharing servers, the server + may respond to a Connection.Open method with a Connection.Redirect. + The insist option tells the server that the client is insisting on + a connection to the specified server. + + + When the client uses the insist option, the server SHOULD accept + the client connection unless it is technically unable to do so. + + + + + signal that the connection is ready + + This method signals to the client that the connection is ready for + use. + + + + + + asks the client to use a different server + + This method redirects the client to another server, based on the + requested virtual host and/or capabilities. + + + When getting the Connection.Redirect method, the client SHOULD + reconnect to the host specified, and if that host is not present, + to any of the hosts specified in the known-hosts list. + + + + server to connect to + + Specifies the server to connect to. This is an IP address or a + DNS name, optionally followed by a colon and a port number. If + no port number is specified, the client should use the default + port number for the protocol. + + + + + + + + request a connection close + + This method indicates that the sender wants to close the connection. + This may be due to internal conditions (e.g. a forced shut-down) or + due to an error handling a specific method, i.e. an exception. When + a close is due to an exception, the sender provides the class and + method id of the method which caused the exception. + + + After sending this method any received method except the Close-OK + method MUST be discarded. + + + The peer sending this method MAY use a counter or timeout to + detect failure of the other peer to respond correctly with + the Close-OK method. + + + When a server receives the Close method from a client it MUST + delete all server-side resources associated with the client's + context. A client CANNOT reconnect to a context after sending + or receiving a Close method. + + + + + + + + failing method class + + When the close is provoked by a method exception, this is the + class of the method. + + + + failing method ID + + When the close is provoked by a method exception, this is the + ID of the method. + + + + + confirm a connection close + + This method confirms a Connection.Close method and tells the + recipient that it is safe to release resources for the connection + and close the socket. + + + A peer that detects a socket closure without having received a + Close-Ok handshake method SHOULD log the error. + + + + + + + + work with channels + + The channel class provides methods for a client to establish a virtual + connection - a channel - to a server and for both peers to operate the + virtual connection thereafter. + + + channel = open-channel *use-channel close-channel + open-channel = C:OPEN S:OPEN-OK + use-channel = C:FLOW S:FLOW-OK + / S:FLOW C:FLOW-OK + / S:ALERT + / functional-class + close-channel = C:CLOSE S:CLOSE-OK + / S:CLOSE C:CLOSE-OK + + + + + + open a channel for use + + This method opens a virtual connection (a channel). + + + This method MUST NOT be called when the channel is already open. + + + + + out-of-band settings + + Configures out-of-band transfers on this channel. The syntax and + meaning of this field will be formally defined at a later date. + + + + + + signal that the channel is ready + + This method signals to the client that the channel is ready for use. + + + + + + enable/disable flow from peer + + This method asks the peer to pause or restart the flow of content + data. This is a simple flow-control mechanism that a peer can use + to avoid oveflowing its queues or otherwise finding itself receiving + more messages than it can process. Note that this method is not + intended for window control. The peer that receives a request to + stop sending content should finish sending the current content, if + any, and then wait until it receives a Flow restart method. + + + When a new channel is opened, it is active. Some applications + assume that channels are inactive until started. To emulate this + behaviour a client MAY open the channel, then pause it. + + + When sending content data in multiple frames, a peer SHOULD monitor + the channel for incoming methods and respond to a Channel.Flow as + rapidly as possible. + + + A peer MAY use the Channel.Flow method to throttle incoming content + data for internal reasons, for example, when exchangeing data over a + slower connection. + + + The peer that requests a Channel.Flow method MAY disconnect and/or + ban a peer that does not respect the request. + + + + + + start/stop content frames + + If 1, the peer starts sending content frames. If 0, the peer + stops sending content frames. + + + + + confirm a flow method + + Confirms to the peer that a flow command was received and processed. + + + + + current flow setting + + Confirms the setting of the processed flow method: 1 means the + peer will start sending or continue to send content frames; 0 + means it will not. + + + + + + send a non-fatal warning message + + This method allows the server to send a non-fatal warning to the + client. This is used for methods that are normally asynchronous + and thus do not have confirmations, and for which the server may + detect errors that need to be reported. Fatal errors are handled + as channel or connection exceptions; non-fatal errors are sent + through this method. + + + + + + detailed information for warning + + A set of fields that provide more information about the + problem. The meaning of these fields are defined on a + per-reply-code basis (TO BE DEFINED). + + + + + + request a channel close + + This method indicates that the sender wants to close the channel. + This may be due to internal conditions (e.g. a forced shut-down) or + due to an error handling a specific method, i.e. an exception. When + a close is due to an exception, the sender provides the class and + method id of the method which caused the exception. + + + After sending this method any received method except + Channel.Close-OK MUST be discarded. + + + The peer sending this method MAY use a counter or timeout to detect + failure of the other peer to respond correctly with Channel.Close-OK.. + + + + + + + + failing method class + + When the close is provoked by a method exception, this is the + class of the method. + + + + failing method ID + + When the close is provoked by a method exception, this is the + ID of the method. + + + + + confirm a channel close + + This method confirms a Channel.Close method and tells the recipient + that it is safe to release resources for the channel and close the + socket. + + + A peer that detects a socket closure without having received a + Channel.Close-Ok handshake method SHOULD log the error. + + + + + + + + work with access tickets + + The protocol control access to server resources using access tickets. + A client must explicitly request access tickets before doing work. + An access ticket grants a client the right to use a specific set of + resources - called a "realm" - in specific ways. + + + access = C:REQUEST S:REQUEST-OK + + + + + + request an access ticket + + This method requests an access ticket for an access realm. + The server responds by granting the access ticket. If the + client does not have access rights to the requested realm + this causes a connection exception. Access tickets are a + per-channel resource. + + + The realm name MUST start with either "/data" (for application + resources) or "/admin" (for server administration resources). + If the realm starts with any other path, the server MUST raise + a connection exception with reply code 403 (access refused). + + + The server MUST implement the /data realm and MAY implement the + /admin realm. The mapping of resources to realms is not + defined in the protocol - this is a server-side configuration + issue. + + + + + name of requested realm + + If the specified realm is not known to the server, the server + must raise a channel exception with reply code 402 (invalid + path). + + + + request exclusive access + + Request exclusive access to the realm. If the server cannot grant + this - because there are other active tickets for the realm - it + raises a channel exception. + + + + request passive access + + Request message passive access to the specified access realm. + Passive access lets a client get information about resources in + the realm but not to make any changes to them. + + + + request active access + + Request message active access to the specified access realm. + Acvtive access lets a client get create and delete resources in + the realm. + + + + request write access + + Request write access to the specified access realm. Write access + lets a client publish messages to all exchanges in the realm. + + + + request read access + + Request read access to the specified access realm. Read access + lets a client consume messages from queues in the realm. + + + + + grant access to server resources + + This method provides the client with an access ticket. The access + ticket is valid within the current channel and for the lifespan of + the channel. + + + The client MUST NOT use access tickets except within the same + channel as originally granted. + + + The server MUST isolate access tickets per channel and treat an + attempt by a client to mix these as a connection exception. + + + + + + + + work with exchanges + + Exchanges match and distribute messages across queues. Exchanges can be + configured in the server or created at runtime. + + + exchange = C:DECLARE S:DECLARE-OK + / C:DELETE S:DELETE-OK + + + + + amq_exchange_19 + The server MUST implement the direct and fanout exchange types, and + predeclare the corresponding exchanges named amq.direct and amq.fanout + in each virtual host. The server MUST also predeclare a direct + exchange to act as the default exchange for content Publish methods + and for default queue bindings. + + + amq_exchange_20 + The server SHOULD implement the topic exchange type, and predeclare + the corresponding exchange named amq.topic in each virtual host. + + + amq_exchange_21 + The server MAY implement the system exchange type, and predeclare the + corresponding exchanges named amq.system in each virtual host. If the + client attempts to bind a queue to the system exchange, the server + MUST raise a connection exception with reply code 507 (not allowed). + + + amq_exchange_22 + The default exchange MUST be defined as internal, and be inaccessible + to the client except by specifying an empty exchange name in a content + Publish method. That is, the server MUST NOT let clients make explicit + bindings to this exchange. + + + + declare exchange, create if needed + + This method creates an exchange if it does not already exist, and if the + exchange exists, verifies that it is of the correct and expected class. + + + amq_exchange_23 + The server SHOULD support a minimum of 16 exchanges per virtual host + and ideally, impose no limit except as defined by available resources. + + + + + + When a client defines a new exchange, this belongs to the access realm + of the ticket used. All further work done with that exchange must be + done with an access ticket for the same realm. + + + The client MUST provide a valid access ticket giving "active" access + to the realm in which the exchange exists or will be created, or + "passive" access if the if-exists flag is set. + + + + + amq_exchange_15 + Exchange names starting with "amq." are reserved for predeclared + and standardised exchanges. If the client attempts to create an + exchange starting with "amq.", the server MUST raise a channel + exception with reply code 403 (access refused). + + + + + exchange type + + Each exchange belongs to one of a set of exchange types implemented + by the server. The exchange types define the functionality of the + exchange - i.e. how messages are routed through it. It is not valid + or meaningful to attempt to change the type of an existing exchange. + + + amq_exchange_16 + If the exchange already exists with a different type, the server + MUST raise a connection exception with a reply code 507 (not allowed). + + + amq_exchange_18 + If the server does not support the requested exchange type it MUST + raise a connection exception with a reply code 503 (command invalid). + + + + + do not create exchange + + If set, the server will not create the exchange. The client can use + this to check whether an exchange exists without modifying the server + state. + + + amq_exchange_05 + If set, and the exchange does not already exist, the server MUST + raise a channel exception with reply code 404 (not found). + + + + request a durable exchange + + If set when creating a new exchange, the exchange will be marked as + durable. Durable exchanges remain active when a server restarts. + Non-durable exchanges (transient exchanges) are purged if/when a + server restarts. + + + amq_exchange_24 + The server MUST support both durable and transient exchanges. + + + The server MUST ignore the durable field if the exchange already + exists. + + + + auto-delete when unused + + If set, the exchange is deleted when all queues have finished + using it. + + + amq_exchange_02 + The server SHOULD allow for a reasonable delay between the point + when it determines that an exchange is not being used (or no longer + used), and the point when it deletes the exchange. At the least it + must allow a client to create an exchange and then bind a queue to + it, with a small but non-zero delay between these two actions. + + + amq_exchange_25 + The server MUST ignore the auto-delete field if the exchange already + exists. + + + + create internal exchange + + If set, the exchange may not be used directly by publishers, but + only when bound to other exchanges. Internal exchanges are used to + construct wiring that is not visible to applications. + + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + arguments for declaration + + A set of arguments for the declaration. The syntax and semantics + of these arguments depends on the server implementation. This + field is ignored if passive is 1. + + + + + confirms an exchange declaration + + This method confirms a Declare method and confirms the name of the + exchange, essential for automatically-named exchanges. + + + + + + delete an exchange + + This method deletes an exchange. When an exchange is deleted all queue + bindings on the exchange are cancelled. + + + + + + The client MUST provide a valid access ticket giving "active" + access rights to the exchange's access realm. + + + + + amq_exchange_11 + The exchange MUST exist. Attempting to delete a non-existing exchange + causes a channel exception. + + + + + delete only if unused + + If set, the server will only delete the exchange if it has no queue + bindings. If the exchange has queue bindings the server does not + delete it but raises a channel exception instead. + + + amq_exchange_12 + If set, the server SHOULD delete the exchange but only if it has + no queue bindings. + + + amq_exchange_13 + If set, the server SHOULD raise a channel exception if the exchange is in + use. + + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + + confirm deletion of an exchange + + This method confirms the deletion of an exchange. + + + + + + + work with queues + + + Queues store and forward messages. Queues can be configured in the server + or created at runtime. Queues must be attached to at least one exchange + in order to receive messages from publishers. + + + queue = C:DECLARE S:DECLARE-OK + / C:BIND S:BIND-OK + / C:PURGE S:PURGE-OK + / C:DELETE S:DELETE-OK + + + + + amq_queue_33 + A server MUST allow any content class to be sent to any queue, in any + mix, and queue and delivery these content classes independently. Note + that all methods that fetch content off queues are specific to a given + content class. + + + + declare queue, create if needed + + This method creates or checks a queue. When creating a new queue + the client can specify various properties that control the durability + of the queue and its contents, and the level of sharing for the queue. + + + amq_queue_34 + The server MUST create a default binding for a newly-created queue + to the default exchange, which is an exchange of type 'direct'. + + + amq_queue_35 + The server SHOULD support a minimum of 256 queues per virtual host + and ideally, impose no limit except as defined by available resources. + + + + + + When a client defines a new queue, this belongs to the access realm + of the ticket used. All further work done with that queue must be + done with an access ticket for the same realm. + + + The client provides a valid access ticket giving "active" access + to the realm in which the queue exists or will be created, or + "passive" access if the if-exists flag is set. + + + + + amq_queue_10 + The queue name MAY be empty, in which case the server MUST create + a new queue with a unique generated name and return this to the + client in the Declare-Ok method. + + + amq_queue_32 + Queue names starting with "amq." are reserved for predeclared and + standardised server queues. If the queue name starts with "amq." + and the passive option is zero, the server MUST raise a connection + exception with reply code 403 (access refused). + + + + + do not create queue + + If set, the server will not create the queue. The client can use + this to check whether a queue exists without modifying the server + state. + + + amq_queue_05 + If set, and the queue does not already exist, the server MUST + respond with a reply code 404 (not found) and raise a channel + exception. + + + + request a durable queue + + If set when creating a new queue, the queue will be marked as + durable. Durable queues remain active when a server restarts. + Non-durable queues (transient queues) are purged if/when a + server restarts. Note that durable queues do not necessarily + hold persistent messages, although it does not make sense to + send persistent messages to a transient queue. + + + amq_queue_03 + The server MUST recreate the durable queue after a restart. + + + amq_queue_36 + The server MUST support both durable and transient queues. + + + amq_queue_37 + The server MUST ignore the durable field if the queue already + exists. + + + + request an exclusive queue + + Exclusive queues may only be consumed from by the current connection. + Setting the 'exclusive' flag always implies 'auto-delete'. + + + amq_queue_38 + The server MUST support both exclusive (private) and non-exclusive + (shared) queues. + + + amq_queue_04 + The server MUST raise a channel exception if 'exclusive' is specified + and the queue already exists and is owned by a different connection. + + + + auto-delete queue when unused + + If set, the queue is deleted when all consumers have finished + using it. Last consumer can be cancelled either explicitly or because + its channel is closed. If there was no consumer ever on the queue, it + won't be deleted. + + + amq_queue_02 + The server SHOULD allow for a reasonable delay between the point + when it determines that a queue is not being used (or no longer + used), and the point when it deletes the queue. At the least it + must allow a client to create a queue and then create a consumer + to read from it, with a small but non-zero delay between these + two actions. The server should equally allow for clients that may + be disconnected prematurely, and wish to re-consume from the same + queue without losing messages. We would recommend a configurable + timeout, with a suitable default value being one minute. + + + amq_queue_31 + The server MUST ignore the auto-delete field if the queue already + exists. + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + arguments for declaration + + A set of arguments for the declaration. The syntax and semantics + of these arguments depends on the server implementation. This + field is ignored if passive is 1. + + + + + confirms a queue definition + + This method confirms a Declare method and confirms the name of the + queue, essential for automatically-named queues. + + + + + Reports the name of the queue. If the server generated a queue + name, this field contains that name. + + + + + number of messages in queue + + Reports the number of messages in the queue, which will be zero + for newly-created queues. + + + + number of consumers + + Reports the number of active consumers for the queue. Note that + consumers can suspend activity (Channel.Flow) in which case they + do not appear in this count. + + + + + + bind queue to an exchange + + This method binds a queue to an exchange. Until a queue is + bound it will not receive any messages. In a classic messaging + model, store-and-forward queues are bound to a dest exchange + and subscription queues are bound to a dest_wild exchange. + + + amq_queue_25 + A server MUST allow ignore duplicate bindings - that is, two or + more bind methods for a specific queue, with identical arguments + - without treating these as an error. + + + amq_queue_39 + If a bind fails, the server MUST raise a connection exception. + + + amq_queue_12 + The server MUST NOT allow a durable queue to bind to a transient + exchange. If the client attempts this the server MUST raise a + channel exception. + + + amq_queue_13 + Bindings for durable queues are automatically durable and the + server SHOULD restore such bindings after a server restart. + + + amq_queue_17 + If the client attempts to an exchange that was declared as internal, + the server MUST raise a connection exception with reply code 530 + (not allowed). + + + amq_queue_40 + The server SHOULD support at least 4 bindings per queue, and + ideally, impose no limit except as defined by available resources. + + + + + + The client provides a valid access ticket giving "active" + access rights to the queue's access realm. + + + + + + Specifies the name of the queue to bind. If the queue name is + empty, refers to the current queue for the channel, which is + the last declared queue. + + + If the client did not previously declare a queue, and the queue + name in this method is empty, the server MUST raise a connection + exception with reply code 530 (not allowed). + + + If the queue does not exist the server MUST raise a channel exception + with reply code 404 (not found). + + + + + The name of the exchange to bind to. + + amq_queue_14 + If the exchange does not exist the server MUST raise a channel + exception with reply code 404 (not found). + + + + message routing key + + Specifies the routing key for the binding. The routing key is + used for routing messages depending on the exchange configuration. + Not all exchanges use a routing key - refer to the specific + exchange documentation. If the routing key is empty and the queue + name is empty, the routing key will be the current queue for the + channel, which is the last declared queue. + + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + arguments for binding + + A set of arguments for the binding. The syntax and semantics of + these arguments depends on the exchange class. + + + + + confirm bind successful + + This method confirms that the bind was successful. + + + + + + purge a queue + + This method removes all messages from a queue. It does not cancel + consumers. Purged messages are deleted without any formal "undo" + mechanism. + + + amq_queue_15 + A call to purge MUST result in an empty queue. + + + amq_queue_41 + On transacted channels the server MUST not purge messages that have + already been sent to a client but not yet acknowledged. + + + amq_queue_42 + The server MAY implement a purge queue or log that allows system + administrators to recover accidentally-purged messages. The server + SHOULD NOT keep purged messages in the same storage spaces as the + live messages since the volumes of purged messages may get very + large. + + + + + + The access ticket must be for the access realm that holds the + queue. + + + The client MUST provide a valid access ticket giving "read" access + rights to the queue's access realm. Note that purging a queue is + equivalent to reading all messages and discarding them. + + + + + Specifies the name of the queue to purge. If the queue name is + empty, refers to the current queue for the channel, which is + the last declared queue. + + + If the client did not previously declare a queue, and the queue + name in this method is empty, the server MUST raise a connection + exception with reply code 530 (not allowed). + + + The queue must exist. Attempting to purge a non-existing queue + causes a channel exception. + + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + confirms a queue purge + + This method confirms the purge of a queue. + + + + number of messages purged + + Reports the number of messages purged. + + + + + + delete a queue + + This method deletes a queue. When a queue is deleted any pending + messages are sent to a dead-letter queue if this is defined in the + server configuration, and all consumers on the queue are cancelled. + + + amq_queue_43 + The server SHOULD use a dead-letter queue to hold messages that + were pending on a deleted queue, and MAY provide facilities for + a system administrator to move these messages back to an active + queue. + + + + + + The client provides a valid access ticket giving "active" + access rights to the queue's access realm. + + + + + + Specifies the name of the queue to delete. If the queue name is + empty, refers to the current queue for the channel, which is the + last declared queue. + + + If the client did not previously declare a queue, and the queue + name in this method is empty, the server MUST raise a connection + exception with reply code 530 (not allowed). + + + The queue must exist. Attempting to delete a non-existing queue + causes a channel exception. + + + + + delete only if unused + + If set, the server will only delete the queue if it has no + consumers. If the queue has consumers the server does does not + delete it but raises a channel exception instead. + + + amq_queue_29 + amq_queue_30 + The server MUST respect the if-unused flag when deleting a queue. + + + + delete only if empty + amq_queue_27 + + If set, the server will only delete the queue if it has no + messages. If the queue is not empty the server raises a channel + exception. + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + + confirm deletion of a queue + + This method confirms the deletion of a queue. + + + + number of messages purged + + Reports the number of messages purged. + + + + + + + work with basic content + + The Basic class provides methods that support an industry-standard + messaging model. + + + + basic = C:QOS S:QOS-OK + / C:CONSUME S:CONSUME-OK + / C:CANCEL S:CANCEL-OK + / C:PUBLISH content + / S:RETURN content + / S:DELIVER content + / C:GET ( S:GET-OK content / S:GET-EMPTY ) + / C:ACK + / C:REJECT + + + + + + + The server SHOULD respect the persistent property of basic messages + and SHOULD make a best-effort to hold persistent basic messages on a + reliable storage mechanism. + + + The server MUST NOT discard a persistent basic message in case of a + queue overflow. The server MAY use the Channel.Flow method to slow + or stop a basic message publisher when necessary. + + + The server MAY overflow non-persistent basic messages to persistent + storage and MAY discard or dead-letter non-persistent basic messages + on a priority basis if the queue size exceeds some configured limit. + + + The server MUST implement at least 2 priority levels for basic + messages, where priorities 0-4 and 5-9 are treated as two distinct + levels. The server MAY implement up to 10 priority levels. + + + The server MUST deliver messages of the same priority in order + irrespective of their individual persistence. + + + The server MUST support both automatic and explicit acknowledgements + on Basic content. + + + + + + MIME content type + + + MIME content encoding + + + Message header field table + + + Non-persistent (1) or persistent (2) + + + The message priority, 0 to 9 + + + The application correlation identifier + + + The destination to reply to + + + Message expiration specification + + + The application message identifier + + + The message timestamp + + + The message type name + + + The creating user id + + + The creating application id + + + Intra-cluster routing identifier + + + + + + + specify quality of service + + This method requests a specific quality of service. The QoS can + be specified for the current channel or for all channels on the + connection. The particular properties and semantics of a qos method + always depend on the content class semantics. Though the qos method + could in principle apply to both peers, it is currently meaningful + only for the server. + + + + + + prefetch window in octets + + The client can request that messages be sent in advance so that + when the client finishes processing a message, the following + message is already held locally, rather than needing to be sent + down the channel. Prefetching gives a performance improvement. + This field specifies the prefetch window size in octets. The + server will send a message in advance if it is equal to or + smaller in size than the available prefetch size (and also falls + into other prefetch limits). May be set to zero, meaning "no + specific limit", although other prefetch limits may still apply. + The prefetch-size is ignored if the no-ack option is set. + + + The server MUST ignore this setting when the client is not + processing any messages - i.e. the prefetch size does not limit + the transfer of single messages to a client, only the sending in + advance of more messages while the client still has one or more + unacknowledged messages. + + + + + prefetch window in messages + + Specifies a prefetch window in terms of whole messages. This + field may be used in combination with the prefetch-size field; + a message will only be sent in advance if both prefetch windows + (and those at the channel and connection level) allow it. + The prefetch-count is ignored if the no-ack option is set. + + + The server MAY send less data in advance than allowed by the + client's specified prefetch windows but it MUST NOT send more. + + + + + apply to entire connection + + By default the QoS settings apply to the current channel only. If + this field is set, they are applied to the entire connection. + + + + + + confirm the requested qos + + This method tells the client that the requested QoS levels could + be handled by the server. The requested QoS applies to all active + consumers until a new QoS is defined. + + + + + + + + start a queue consumer + + This method asks the server to start a "consumer", which is a + transient request for messages from a specific queue. Consumers + last as long as the channel they were created on, or until the + client cancels them. + + + The server SHOULD support at least 16 consumers per queue, unless + the queue was declared as private, and ideally, impose no limit + except as defined by available resources. + + + + + + + The client MUST provide a valid access ticket giving "read" access + rights to the realm for the queue. + + + + + + Specifies the name of the queue to consume from. If the queue name + is null, refers to the current queue for the channel, which is the + last declared queue. + + + If the client did not previously declare a queue, and the queue name + in this method is empty, the server MUST raise a connection exception + with reply code 530 (not allowed). + + + + + + Specifies the identifier for the consumer. The consumer tag is + local to a connection, so two clients can use the same consumer + tags. If this field is empty the server will generate a unique + tag. + + + The tag MUST NOT refer to an existing consumer. If the client + attempts to create two consumers with the same non-empty tag + the server MUST raise a connection exception with reply code + 530 (not allowed). + + + + + + + + + request exclusive access + + Request exclusive consumer access, meaning only this consumer can + access the queue. + + + If the server cannot grant exclusive access to the queue when asked, + - because there are other consumers active - it MUST raise a channel + exception with return code 403 (access refused). + + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + + confirm a new consumer + + The server provides the client with a consumer tag, which is used + by the client for methods called on the consumer at a later stage. + + + + + + Holds the consumer tag specified by the client or provided by + the server. + + + + + + + + + end a queue consumer + + This method cancels a consumer. This does not affect already + delivered messages, but it does mean the server will not send any + more messages for that consumer. The client may receive an + abitrary number of messages in between sending the cancel method + and receiving the cancel-ok reply. + + + If the queue no longer exists when the client sends a cancel command, + or the consumer has been cancelled for other reasons, this command + has no effect. + + + + + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + + confirm a cancelled consumer + + This method confirms that the cancellation was completed. + + + + + + + + + + + publish a message + + This method publishes a message to a specific exchange. The message + will be routed to queues as defined by the exchange configuration + and distributed to any active consumers when the transaction, if any, + is committed. + + + + + + The client MUST provide a valid access ticket giving "write" + access rights to the access realm for the exchange. + + + + + + Specifies the name of the exchange to publish to. The exchange + name can be empty, meaning the default exchange. If the exchange + name is specified, and that exchange does not exist, the server + will raise a channel exception. + + + The server MUST accept a blank exchange name to mean the default + exchange. + + + If the exchange was declared as an internal exchange, the server + MUST raise a channel exception with a reply code 403 (access + refused). + + + The exchange MAY refuse basic content in which case it MUST raise + a channel exception with reply code 540 (not implemented). + + + + + Message routing key + + Specifies the routing key for the message. The routing key is + used for routing messages depending on the exchange configuration. + + + + + indicate mandatory routing + + This flag tells the server how to react if the message cannot be + routed to a queue. If this flag is set, the server will return an + unroutable message with a Return method. If this flag is zero, the + server silently drops the message. + + + The server SHOULD implement the mandatory flag. + + + + + request immediate delivery + + This flag tells the server how to react if the message cannot be + routed to a queue consumer immediately. If this flag is set, the + server will return an undeliverable message with a Return method. + If this flag is zero, the server will queue the message, but with + no guarantee that it will ever be consumed. + + + The server SHOULD implement the immediate flag. + + + + + + return a failed message + + This method returns an undeliverable message that was published + with the "immediate" flag set, or an unroutable message published + with the "mandatory" flag set. The reply code and text provide + information about the reason that the message was undeliverable. + + + + + + + + + Specifies the name of the exchange that the message was + originally published to. + + + + + Message routing key + + Specifies the routing key name specified when the message was + published. + + + + + + + + + notify the client of a consumer message + + This method delivers a message to the client, via a consumer. In + the asynchronous message delivery model, the client starts a + consumer using the Consume method, then the server responds with + Deliver methods as and when messages arrive for that consumer. + + + The server SHOULD track the number of times a message has been + delivered to clients and when a message is redelivered a certain + number of times - e.g. 5 times - without being acknowledged, the + server SHOULD consider the message to be unprocessable (possibly + causing client applications to abort), and move the message to a + dead letter queue. + + + + + + + + + + + + Specifies the name of the exchange that the message was + originally published to. + + + + + Message routing key + + Specifies the routing key name specified when the message was + published. + + + + + + + + + direct access to a queue + + This method provides a direct access to the messages in a queue + using a synchronous dialogue that is designed for specific types of + application where synchronous functionality is more important than + performance. + + + + + + + + The client MUST provide a valid access ticket giving "read" + access rights to the realm for the queue. + + + + + + Specifies the name of the queue to consume from. If the queue name + is null, refers to the current queue for the channel, which is the + last declared queue. + + + If the client did not previously declare a queue, and the queue name + in this method is empty, the server MUST raise a connection exception + with reply code 530 (not allowed). + + + + + + + + provide client with a message + + This method delivers a message to the client following a get + method. A message delivered by 'get-ok' must be acknowledged + unless the no-ack option was set in the get method. + + + + + + + + + + Specifies the name of the exchange that the message was originally + published to. If empty, the message was published to the default + exchange. + + + + + Message routing key + + Specifies the routing key name specified when the message was + published. + + + + + number of messages pending + + This field reports the number of messages pending on the queue, + excluding the message being delivered. Note that this figure is + indicative, not reliable, and can change arbitrarily as messages + are added to the queue and removed by other clients. + + + + + + + indicate no messages available + + This method tells the client that the queue has no messages + available for the client. + + + + + Cluster id + + For use by cluster applications, should not be used by + client applications. + + + + + + + + acknowledge one or more messages + + This method acknowledges one or more messages delivered via the + Deliver or Get-Ok methods. The client can ask to confirm a + single message or a set of messages up to and including a specific + message. + + + + + + acknowledge multiple messages + + If set to 1, the delivery tag is treated as "up to and including", + so that the client can acknowledge multiple messages with a single + method. If set to zero, the delivery tag refers to a single + message. If the multiple field is 1, and the delivery tag is zero, + tells the server to acknowledge all outstanding mesages. + + + The server MUST validate that a non-zero delivery-tag refers to an + delivered message, and raise a channel exception if this is not the + case. + + + + + + + + reject an incoming message + + This method allows a client to reject a message. It can be used to + interrupt and cancel large incoming messages, or return untreatable + messages to their original queue. + + + The server SHOULD be capable of accepting and process the Reject + method while sending message content with a Deliver or Get-Ok + method. I.e. the server should read and process incoming methods + while sending output frames. To cancel a partially-send content, + the server sends a content body frame of size 1 (i.e. with no data + except the frame-end octet). + + + The server SHOULD interpret this method as meaning that the client + is unable to process the message at this time. + + + A client MUST NOT use this method as a means of selecting messages + to process. A rejected message MAY be discarded or dead-lettered, + not necessarily passed to another client. + + + + + + + requeue the message + + If this field is zero, the message will be discarded. If this bit + is 1, the server will attempt to requeue the message. + + + The server MUST NOT deliver the message to the same client within + the context of the current channel. The recommended strategy is + to attempt to deliver the message to an alternative consumer, and + if that is not possible, to move the message to a dead-letter + queue. The server MAY use more sophisticated tracking to hold + the message on the queue and redeliver it to the same client at + a later stage. + + + + + + redeliver unacknowledged messages. This method is only allowed on non-transacted channels. + + This method asks the broker to redeliver all unacknowledged messages on a + specifieid channel. Zero or more messages may be redelivered. + + + + + requeue the message + + If this field is zero, the message will be redelivered to the original recipient. If this bit + is 1, the server will attempt to requeue the message, potentially then delivering it to an + alternative subscriber. + + + + + The server MUST set the redelivered flag on all messages that are resent. + + + The server MUST raise a channel exception if this is called on a transacted channel. + + + + + + + + + + work with file content + + The file class provides methods that support reliable file transfer. + File messages have a specific set of properties that are required for + interoperability with file transfer applications. File messages and + acknowledgements are subject to channel transactions. Note that the + file class does not provide message browsing methods; these are not + compatible with the staging model. Applications that need browsable + file transfer should use Basic content and the Basic class. + + + + file = C:QOS S:QOS-OK + / C:CONSUME S:CONSUME-OK + / C:CANCEL S:CANCEL-OK + / C:OPEN S:OPEN-OK C:STAGE content + / S:OPEN C:OPEN-OK S:STAGE content + / C:PUBLISH + / S:DELIVER + / S:RETURN + / C:ACK + / C:REJECT + + + + + + + The server MUST make a best-effort to hold file messages on a + reliable storage mechanism. + + + The server MUST NOT discard a file message in case of a queue + overflow. The server MUST use the Channel.Flow method to slow or stop + a file message publisher when necessary. + + + The server MUST implement at least 2 priority levels for file + messages, where priorities 0-4 and 5-9 are treated as two distinct + levels. The server MAY implement up to 10 priority levels. + + + The server MUST support both automatic and explicit acknowledgements + on file content. + + + + + + MIME content type + + + MIME content encoding + + + Message header field table + + + The message priority, 0 to 9 + + + The destination to reply to + + + The application message identifier + + + The message filename + + + The message timestamp + + + Intra-cluster routing identifier + + + + + + + specify quality of service + + This method requests a specific quality of service. The QoS can + be specified for the current channel or for all channels on the + connection. The particular properties and semantics of a qos method + always depend on the content class semantics. Though the qos method + could in principle apply to both peers, it is currently meaningful + only for the server. + + + + + + prefetch window in octets + + The client can request that messages be sent in advance so that + when the client finishes processing a message, the following + message is already held locally, rather than needing to be sent + down the channel. Prefetching gives a performance improvement. + This field specifies the prefetch window size in octets. May be + set to zero, meaning "no specific limit". Note that other + prefetch limits may still apply. The prefetch-size is ignored + if the no-ack option is set. + + + + + prefetch window in messages + + Specifies a prefetch window in terms of whole messages. This + is compatible with some file API implementations. This field + may be used in combination with the prefetch-size field; a + message will only be sent in advance if both prefetch windows + (and those at the channel and connection level) allow it. + The prefetch-count is ignored if the no-ack option is set. + + + The server MAY send less data in advance than allowed by the + client's specified prefetch windows but it MUST NOT send more. + + + + + apply to entire connection + + By default the QoS settings apply to the current channel only. If + this field is set, they are applied to the entire connection. + + + + + + confirm the requested qos + + This method tells the client that the requested QoS levels could + be handled by the server. The requested QoS applies to all active + consumers until a new QoS is defined. + + + + + + + + start a queue consumer + + This method asks the server to start a "consumer", which is a + transient request for messages from a specific queue. Consumers + last as long as the channel they were created on, or until the + client cancels them. + + + The server SHOULD support at least 16 consumers per queue, unless + the queue was declared as private, and ideally, impose no limit + except as defined by available resources. + + + + + + + The client MUST provide a valid access ticket giving "read" access + rights to the realm for the queue. + + + + + + Specifies the name of the queue to consume from. If the queue name + is null, refers to the current queue for the channel, which is the + last declared queue. + + + If the client did not previously declare a queue, and the queue name + in this method is empty, the server MUST raise a connection exception + with reply code 530 (not allowed). + + + + + + Specifies the identifier for the consumer. The consumer tag is + local to a connection, so two clients can use the same consumer + tags. If this field is empty the server will generate a unique + tag. + + + The tag MUST NOT refer to an existing consumer. If the client + attempts to create two consumers with the same non-empty tag + the server MUST raise a connection exception with reply code + 530 (not allowed). + + + + + + + + + request exclusive access + + Request exclusive consumer access, meaning only this consumer can + access the queue. + + + If the server cannot grant exclusive access to the queue when asked, + - because there are other consumers active - it MUST raise a channel + exception with return code 405 (resource locked). + + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + + confirm a new consumer + + This method provides the client with a consumer tag which it MUST + use in methods that work with the consumer. + + + + + + Holds the consumer tag specified by the client or provided by + the server. + + + + + + + + + end a queue consumer + + This method cancels a consumer. This does not affect already + delivered messages, but it does mean the server will not send any + more messages for that consumer. + + + + + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + + confirm a cancelled consumer + + This method confirms that the cancellation was completed. + + + + + + + + + + + request to start staging + + This method requests permission to start staging a message. Staging + means sending the message into a temporary area at the recipient end + and then delivering the message by referring to this temporary area. + Staging is how the protocol handles partial file transfers - if a + message is partially staged and the connection breaks, the next time + the sender starts to stage it, it can restart from where it left off. + + + + + + + staging identifier + + This is the staging identifier. This is an arbitrary string chosen + by the sender. For staging to work correctly the sender must use + the same staging identifier when staging the same message a second + time after recovery from a failure. A good choice for the staging + identifier would be the SHA1 hash of the message properties data + (including the original filename, revised time, etc.). + + + + + message content size + + The size of the content in octets. The recipient may use this + information to allocate or check available space in advance, to + avoid "disk full" errors during staging of very large messages. + + + The sender MUST accurately fill the content-size field. + Zero-length content is permitted. + + + + + + confirm staging ready + + This method confirms that the recipient is ready to accept staged + data. If the message was already partially-staged at a previous + time the recipient will report the number of octets already staged. + + + + + + + already staged amount + + The amount of previously-staged content in octets. For a new + message this will be zero. + + + The sender MUST start sending data from this octet offset in the + message, counting from zero. + + + The recipient MAY decide how long to hold partially-staged content + and MAY implement staging by always discarding partially-staged + content. However if it uses the file content type it MUST support + the staging methods. + + + + + + + + stage message content + + This method stages the message, sending the message content to the + recipient from the octet offset specified in the Open-Ok method. + + + + + + + + + + publish a message + + This method publishes a staged file message to a specific exchange. + The file message will be routed to queues as defined by the exchange + configuration and distributed to any active consumers when the + transaction, if any, is committed. + + + + + + The client MUST provide a valid access ticket giving "write" + access rights to the access realm for the exchange. + + + + + + Specifies the name of the exchange to publish to. The exchange + name can be empty, meaning the default exchange. If the exchange + name is specified, and that exchange does not exist, the server + will raise a channel exception. + + + The server MUST accept a blank exchange name to mean the default + exchange. + + + If the exchange was declared as an internal exchange, the server + MUST respond with a reply code 403 (access refused) and raise a + channel exception. + + + The exchange MAY refuse file content in which case it MUST respond + with a reply code 540 (not implemented) and raise a channel + exception. + + + + + Message routing key + + Specifies the routing key for the message. The routing key is + used for routing messages depending on the exchange configuration. + + + + + indicate mandatory routing + + This flag tells the server how to react if the message cannot be + routed to a queue. If this flag is set, the server will return an + unroutable message with a Return method. If this flag is zero, the + server silently drops the message. + + + The server SHOULD implement the mandatory flag. + + + + + request immediate delivery + + This flag tells the server how to react if the message cannot be + routed to a queue consumer immediately. If this flag is set, the + server will return an undeliverable message with a Return method. + If this flag is zero, the server will queue the message, but with + no guarantee that it will ever be consumed. + + + The server SHOULD implement the immediate flag. + + + + + staging identifier + + This is the staging identifier of the message to publish. The + message must have been staged. Note that a client can send the + Publish method asynchronously without waiting for staging to + finish. + + + + + + return a failed message + + This method returns an undeliverable message that was published + with the "immediate" flag set, or an unroutable message published + with the "mandatory" flag set. The reply code and text provide + information about the reason that the message was undeliverable. + + + + + + + + + Specifies the name of the exchange that the message was + originally published to. + + + + + Message routing key + + Specifies the routing key name specified when the message was + published. + + + + + + + + + notify the client of a consumer message + + This method delivers a staged file message to the client, via a + consumer. In the asynchronous message delivery model, the client + starts a consumer using the Consume method, then the server + responds with Deliver methods as and when messages arrive for + that consumer. + + + The server SHOULD track the number of times a message has been + delivered to clients and when a message is redelivered a certain + number of times - e.g. 5 times - without being acknowledged, the + server SHOULD consider the message to be unprocessable (possibly + causing client applications to abort), and move the message to a + dead letter queue. + + + + + + + + + + + + Specifies the name of the exchange that the message was originally + published to. + + + + + Message routing key + + Specifies the routing key name specified when the message was + published. + + + + + staging identifier + + This is the staging identifier of the message to deliver. The + message must have been staged. Note that a server can send the + Deliver method asynchronously without waiting for staging to + finish. + + + + + + + + + acknowledge one or more messages + + This method acknowledges one or more messages delivered via the + Deliver method. The client can ask to confirm a single message or + a set of messages up to and including a specific message. + + + + + + acknowledge multiple messages + + If set to 1, the delivery tag is treated as "up to and including", + so that the client can acknowledge multiple messages with a single + method. If set to zero, the delivery tag refers to a single + message. If the multiple field is 1, and the delivery tag is zero, + tells the server to acknowledge all outstanding mesages. + + + The server MUST validate that a non-zero delivery-tag refers to an + delivered message, and raise a channel exception if this is not the + case. + + + + + + + + + reject an incoming message + + This method allows a client to reject a message. It can be used to + return untreatable messages to their original queue. Note that file + content is staged before delivery, so the client will not use this + method to interrupt delivery of a large message. + + + The server SHOULD interpret this method as meaning that the client + is unable to process the message at this time. + + + A client MUST NOT use this method as a means of selecting messages + to process. A rejected message MAY be discarded or dead-lettered, + not necessarily passed to another client. + + + + + + + requeue the message + + If this field is zero, the message will be discarded. If this bit + is 1, the server will attempt to requeue the message. + + + The server MUST NOT deliver the message to the same client within + the context of the current channel. The recommended strategy is + to attempt to deliver the message to an alternative consumer, and + if that is not possible, to move the message to a dead-letter + queue. The server MAY use more sophisticated tracking to hold + the message on the queue and redeliver it to the same client at + a later stage. + + + + + + + + + work with streaming content + + + The stream class provides methods that support multimedia streaming. + The stream class uses the following semantics: one message is one + packet of data; delivery is unacknowleged and unreliable; the consumer + can specify quality of service parameters that the server can try to + adhere to; lower-priority messages may be discarded in favour of high + priority messages. + + + + stream = C:QOS S:QOS-OK + / C:CONSUME S:CONSUME-OK + / C:CANCEL S:CANCEL-OK + / C:PUBLISH content + / S:RETURN + / S:DELIVER content + + + + + + + The server SHOULD discard stream messages on a priority basis if + the queue size exceeds some configured limit. + + + The server MUST implement at least 2 priority levels for stream + messages, where priorities 0-4 and 5-9 are treated as two distinct + levels. The server MAY implement up to 10 priority levels. + + + The server MUST implement automatic acknowledgements on stream + content. That is, as soon as a message is delivered to a client + via a Deliver method, the server must remove it from the queue. + + + + + + + MIME content type + + + MIME content encoding + + + Message header field table + + + The message priority, 0 to 9 + + + The message timestamp + + + + + + + specify quality of service + + This method requests a specific quality of service. The QoS can + be specified for the current channel or for all channels on the + connection. The particular properties and semantics of a qos method + always depend on the content class semantics. Though the qos method + could in principle apply to both peers, it is currently meaningful + only for the server. + + + + + + prefetch window in octets + + The client can request that messages be sent in advance so that + when the client finishes processing a message, the following + message is already held locally, rather than needing to be sent + down the channel. Prefetching gives a performance improvement. + This field specifies the prefetch window size in octets. May be + set to zero, meaning "no specific limit". Note that other + prefetch limits may still apply. + + + + + prefetch window in messages + + Specifies a prefetch window in terms of whole messages. This + field may be used in combination with the prefetch-size field; + a message will only be sent in advance if both prefetch windows + (and those at the channel and connection level) allow it. + + + + + transfer rate in octets/second + + Specifies a desired transfer rate in octets per second. This is + usually determined by the application that uses the streaming + data. A value of zero means "no limit", i.e. as rapidly as + possible. + + + The server MAY ignore the prefetch values and consume rates, + depending on the type of stream and the ability of the server + to queue and/or reply it. The server MAY drop low-priority + messages in favour of high-priority messages. + + + + + apply to entire connection + + By default the QoS settings apply to the current channel only. If + this field is set, they are applied to the entire connection. + + + + + + confirm the requested qos + + This method tells the client that the requested QoS levels could + be handled by the server. The requested QoS applies to all active + consumers until a new QoS is defined. + + + + + + + + start a queue consumer + + This method asks the server to start a "consumer", which is a + transient request for messages from a specific queue. Consumers + last as long as the channel they were created on, or until the + client cancels them. + + + The server SHOULD support at least 16 consumers per queue, unless + the queue was declared as private, and ideally, impose no limit + except as defined by available resources. + + + Streaming applications SHOULD use different channels to select + different streaming resolutions. AMQP makes no provision for + filtering and/or transforming streams except on the basis of + priority-based selective delivery of individual messages. + + + + + + + The client MUST provide a valid access ticket giving "read" access + rights to the realm for the queue. + + + + + + Specifies the name of the queue to consume from. If the queue name + is null, refers to the current queue for the channel, which is the + last declared queue. + + + If the client did not previously declare a queue, and the queue name + in this method is empty, the server MUST raise a connection exception + with reply code 530 (not allowed). + + + + + + Specifies the identifier for the consumer. The consumer tag is + local to a connection, so two clients can use the same consumer + tags. If this field is empty the server will generate a unique + tag. + + + The tag MUST NOT refer to an existing consumer. If the client + attempts to create two consumers with the same non-empty tag + the server MUST raise a connection exception with reply code + 530 (not allowed). + + + + + + + request exclusive access + + Request exclusive consumer access, meaning only this consumer can + access the queue. + + + If the server cannot grant exclusive access to the queue when asked, + - because there are other consumers active - it MUST raise a channel + exception with return code 405 (resource locked). + + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + + + confirm a new consumer + + This method provides the client with a consumer tag which it may + use in methods that work with the consumer. + + + + + + Holds the consumer tag specified by the client or provided by + the server. + + + + + + + + end a queue consumer + + This method cancels a consumer. Since message delivery is + asynchronous the client may continue to receive messages for + a short while after canceling a consumer. It may process or + discard these as appropriate. + + + + + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + + confirm a cancelled consumer + + This method confirms that the cancellation was completed. + + + + + + + + + + + publish a message + + This method publishes a message to a specific exchange. The message + will be routed to queues as defined by the exchange configuration + and distributed to any active consumers as appropriate. + + + + + + The client MUST provide a valid access ticket giving "write" + access rights to the access realm for the exchange. + + + + + + Specifies the name of the exchange to publish to. The exchange + name can be empty, meaning the default exchange. If the exchange + name is specified, and that exchange does not exist, the server + will raise a channel exception. + + + The server MUST accept a blank exchange name to mean the default + exchange. + + + If the exchange was declared as an internal exchange, the server + MUST respond with a reply code 403 (access refused) and raise a + channel exception. + + + The exchange MAY refuse stream content in which case it MUST + respond with a reply code 540 (not implemented) and raise a + channel exception. + + + + + Message routing key + + Specifies the routing key for the message. The routing key is + used for routing messages depending on the exchange configuration. + + + + + indicate mandatory routing + + This flag tells the server how to react if the message cannot be + routed to a queue. If this flag is set, the server will return an + unroutable message with a Return method. If this flag is zero, the + server silently drops the message. + + + The server SHOULD implement the mandatory flag. + + + + + request immediate delivery + + This flag tells the server how to react if the message cannot be + routed to a queue consumer immediately. If this flag is set, the + server will return an undeliverable message with a Return method. + If this flag is zero, the server will queue the message, but with + no guarantee that it will ever be consumed. + + + The server SHOULD implement the immediate flag. + + + + + + return a failed message + + This method returns an undeliverable message that was published + with the "immediate" flag set, or an unroutable message published + with the "mandatory" flag set. The reply code and text provide + information about the reason that the message was undeliverable. + + + + + + + + + Specifies the name of the exchange that the message was + originally published to. + + + + + Message routing key + + Specifies the routing key name specified when the message was + published. + + + + + + + + + notify the client of a consumer message + + This method delivers a message to the client, via a consumer. In + the asynchronous message delivery model, the client starts a + consumer using the Consume method, then the server responds with + Deliver methods as and when messages arrive for that consumer. + + + + + + + + + + Specifies the name of the exchange that the message was originally + published to. + + + + + + Specifies the name of the queue that the message came from. Note + that a single channel can start many consumers on different + queues. + + + + + + + + + work with standard transactions + + + Standard transactions provide so-called "1.5 phase commit". We can + ensure that work is never lost, but there is a chance of confirmations + being lost, so that messages may be resent. Applications that use + standard transactions must be able to detect and ignore duplicate + messages. + + + An client using standard transactions SHOULD be able to track all + messages received within a reasonable period, and thus detect and + reject duplicates of the same message. It SHOULD NOT pass these to + the application layer. + + + tx = C:SELECT S:SELECT-OK + / C:COMMIT S:COMMIT-OK + / C:ROLLBACK S:ROLLBACK-OK + + + + + +select standard transaction mode + + This method sets the channel to use standard transactions. The + client must use this method at least once on a channel before + using the Commit or Rollback methods. + + + + + +confirm transaction mode + + This method confirms to the client that the channel was successfully + set to use standard transactions. + + + + + +commit the current transaction + + This method commits all messages published and acknowledged in + the current transaction. A new transaction starts immediately + after a commit. + + + + + +confirm a successful commit + + This method confirms to the client that the commit succeeded. + Note that if a commit fails, the server raises a channel exception. + + + + + +abandon the current transaction + + This method abandons all messages published and acknowledged in + the current transaction. A new transaction starts immediately + after a rollback. + + + + + +confirm a successful rollback + + This method confirms to the client that the rollback succeeded. + Note that if an rollback fails, the server raises a channel exception. + + + + + + + work with distributed transactions + + + Distributed transactions provide so-called "2-phase commit". The + AMQP distributed transaction model supports the X-Open XA + architecture and other distributed transaction implementations. + The Dtx class assumes that the server has a private communications + channel (not AMQP) to a distributed transaction coordinator. + + + dtx = C:SELECT S:SELECT-OK + C:START S:START-OK + + + + + +select standard transaction mode + + This method sets the channel to use distributed transactions. The + client must use this method at least once on a channel before + using the Start method. + + + + + +confirm transaction mode + + This method confirms to the client that the channel was successfully + set to use distributed transactions. + + + + + + start a new distributed transaction + + This method starts a new distributed transaction. This must be + the first method on a new channel that uses the distributed + transaction mode, before any methods that publish or consume + messages. + + + + + transaction identifier + + The distributed transaction key. This identifies the transaction + so that the AMQP server can coordinate with the distributed + transaction coordinator. + + + + + + confirm the start of a new distributed transaction + + This method confirms to the client that the transaction started. + Note that if a start fails, the server raises a channel exception. + + + + + + + methods for protocol tunneling. + + + The tunnel methods are used to send blocks of binary data - which + can be serialised AMQP methods or other protocol frames - between + AMQP peers. + + + tunnel = C:REQUEST + / S:REQUEST + + + + + Message header field table + + + The identity of the tunnelling proxy + + + The name or type of the message being tunnelled + + + The message durability indicator + + + The message broadcast mode + + + + sends a tunnelled method + + This method tunnels a block of binary data, which can be an + encoded AMQP method or other data. The binary data is sent + as the content for the Tunnel.Request method. + + + + meta data for the tunnelled block + + This field table holds arbitrary meta-data that the sender needs + to pass to the recipient. + + + + + + + test functional primitives of the implementation + + + The test class provides methods for a peer to test the basic + operational correctness of another peer. The test methods are + intended to ensure that all peers respect at least the basic + elements of the protocol, such as frame and content organisation + and field types. We assume that a specially-designed peer, a + "monitor client" would perform such tests. + + + test = C:INTEGER S:INTEGER-OK + / S:INTEGER C:INTEGER-OK + / C:STRING S:STRING-OK + / S:STRING C:STRING-OK + / C:TABLE S:TABLE-OK + / S:TABLE C:TABLE-OK + / C:CONTENT S:CONTENT-OK + / S:CONTENT C:CONTENT-OK + + + + + + test integer handling + + This method tests the peer's capability to correctly marshal integer + data. + + + + + + octet test value + + An octet integer test value. + + + + short test value + + A short integer test value. + + + + long test value + + A long integer test value. + + + + long-long test value + + A long long integer test value. + + + + operation to test + + The client must execute this operation on the provided integer + test fields and return the result. + + + return sum of test values + return lowest of test values + return highest of test values + + + + + report integer test result + + This method reports the result of an Integer method. + + + + + result value + + The result of the tested operation. + + + + + + test string handling + + This method tests the peer's capability to correctly marshal string + data. + + + + + + short string test value + + An short string test value. + + + + long string test value + + A long string test value. + + + + operation to test + + The client must execute this operation on the provided string + test fields and return the result. + + + return concatentation of test strings + return shortest of test strings + return longest of test strings + + + + + report string test result + + This method reports the result of a String method. + + + + + result value + + The result of the tested operation. + + + + + + test field table handling + + This method tests the peer's capability to correctly marshal field + table data. + + + + + + field table of test values + + A field table of test values. + + + + operation to test on integers + + The client must execute this operation on the provided field + table integer values and return the result. + + + return sum of numeric field values + return min of numeric field values + return max of numeric field values + + + + operation to test on strings + + The client must execute this operation on the provided field + table string values and return the result. + + + return concatenation of string field values + return shortest of string field values + return longest of string field values + + + + + report table test result + + This method reports the result of a Table method. + + + + + integer result value + + The result of the tested integer operation. + + + + string result value + + The result of the tested string operation. + + + + + + test content handling + + This method tests the peer's capability to correctly marshal content. + + + + + + + report content test result + + This method reports the result of a Content method. It contains the + content checksum and echoes the original content as provided. + + + + + content hash + + The 32-bit checksum of the content, calculated by adding the + content into a 32-bit accumulator. + + + + + diff --git a/config.json b/config.json new file mode 100644 index 0000000..14d8f02 --- /dev/null +++ b/config.json @@ -0,0 +1,7 @@ +{ + host: 'localhost', + port: '5672', + user: 'guest', + pass: 'guest', + vhost: '/' +} diff --git a/lib/RabbitFoot.pm b/lib/RabbitFoot.pm new file mode 100644 index 0000000..8ddbe34 --- /dev/null +++ b/lib/RabbitFoot.pm @@ -0,0 +1,657 @@ +package RabbitFoot; + +use Data::Dumper; +use List::MoreUtils qw(none); +use IO::Socket::INET; +use Sys::SigAction qw(timeout_call); +use Net::AMQP; +use Net::AMQP::Common qw(:all); + +use Moose; + +our $VERSION = '0.01'; + +has verbose => ( + isa => 'Bool', + is => 'rw', +); + +has timeout => ( + isa => 'Int', + is => 'rw', + default => 1, +); + +has _socket => ( + isa => 'IO::Socket::INET', + is => 'rw', + clearer => 'clear_socket', +); + +has _is_open => ( + isa => 'Bool', + is => 'rw', + default => 0, +); + +has _is_oepn_channel => ( + isa => 'Bool', + is => 'rw', + default => 0, +); + +has _consume_tag => ( + isa => 'Str', + is => 'rw', + default => '', +); + +__PACKAGE__->meta->make_immutable; +no Moose; + +sub load_xml_spec { + my ($self, $file,) = @_; + + eval { + Net::AMQP::Protocol->load_xml_spec($file); + }; + die $@, "\n" if $@; + + return $self; +} + +sub connect { + my ($self, $args) = @_; + + eval { + $self->_connect( + $args, + )->_start( + $args, + )->_tune( + )->_open( + $args, + )->_open_channel( + ); + }; + + return $self if !$@; + + my $exception = $@; + $self->close(); + die $exception; +} + +sub _connect { + my ($self, $args,) = @_; + + if ($self->verbose) { + print STDERR 'connect to ', $args->{host}, ':', $args->{port}, '...', "\n"; + } + + my $socket = IO::Socket::INET->new( + Proto => 'tcp', + PeerAddr => $args->{host}, + PeerPort => $args->{port}, + Timeout => $self->timeout, + ) or die 'Error connecting to AMQP Server!', "\n"; + + $self->_socket($socket); + return $self; +} + +sub _start { + my ($self, $args,) = @_; + + if ($self->verbose) { + print STDERR 'post header', "\n"; + } + + print {$self->_socket} Net::AMQP::Protocol->header; + + my $frame = $self->_read_and_valid('Connection::Start'); + + my @mechanisms = split /\s/, $frame->method_frame->mechanisms; + die 'AMQPLAIN is not found in mechanisms', "\n" + if none {$_ eq 'AMQPLAIN'} @mechanisms; + + my @locales = split /\s/, $frame->method_frame->locales; + die 'en_US is not found in locales', "\n" + if none {$_ eq 'en_US'} @locales; + + $self->_post( + Net::AMQP::Protocol::Connection::StartOk->new( + client_properties => { + platform => 'Perl', + product => __PACKAGE__, + information => 'http://d.hatena.ne.jp/cooldaemon/', + version => '0.01', + }, + mechanism => 'AMQPLAIN', + response => { + LOGIN => $args->{user}, + PASSWORD => $args->{pass}, + }, + locale => 'en_US', + ), + ); + + return $self; +} + +sub _tune { + my ($self,) = @_; + + my $frame = $self->_read_and_valid('Connection::Tune'); + + $self->_post( + Net::AMQP::Protocol::Connection::TuneOk->new( + channel_max => $frame->method_frame->channel_max, + frame_max => $frame->method_frame->frame_max, + heartbeat => $frame->method_frame->heartbeat, + ), + ); + + return $self; +} + +sub _open { + my ($self, $args,) = @_; + + $self->_post_and_read( + 'Connection::Open', + { + virtual_host => $args->{vhost}, + capabilities => '', + insist => 1, + }, + 'Connection::OpenOk', + ); + $self->_is_open(1); + + return $self; +} + +sub close { + my ($self,) = @_; + + for my $method (qw(cancel _close_channel _close _disconnect)) { + eval {$self->$method()}; + } + + return $self; +} + +sub _close { + my ($self,) = @_; + + return $self if !$self->_is_open; + + $self->_post_and_read('Connection::Close', {}, 'Connection::CloseOk',); + $self->_is_open(0); + + return $self; +} + +sub _disconnect { + my ($self,) = @_; + + return if !$self->_socket; + + CORE::close $self->_socket; + $self->clear_socket; + + return; +} + +sub _open_channel { + my ($self) = @_; + + $self->_post_and_read('Channel::Open', {}, 'Channel::OpenOk', 1,); + $self->_is_oepn_channel(1); + + return $self; +} + +sub _close_channel { + my ($self,) = @_; + + return $self if !$self->_is_oepn_channel; + + $self->_post_and_read('Channel::Close', {}, 'Channel::CloseOk', 1,); + $self->_is_oepn_channel(0); + + return $self; +} + +sub declare_exchange { + my ($self, $args,) = @_; + + return $self->_post_and_read( + 'Exchange::Declare', + { + type => 'direct', + passive => 0, + durable => 0, + auto_delete => 0, + internal => 0, + %$args, # exchange + ticket => 0, + nowait => 0, + }, + 'Exchange::DeclareOk', + 1, + ); +} + +sub delete_exchange { + my ($self, $args,) = @_; + + return $self->_post_and_read( + 'Exchange::Delete', + { + if_unused => 0, + %$args, # exchange + ticket => 0, + nowait => 0, + }, + 'Exchange::DeleteOk', + 1, + ); +} + +sub declare_queue { + my ($self, $args,) = @_; + + return $self->_post_and_read( + 'Queue::Declare', + { + queue => '', + passive => 0, + durable => 0, + exclusive => 0, + auto_delete => 0, + %$args, + ticket => 0, + no_ack => 1, + nowait => 0, + }, + 'Queue::DeclareOk', + 1, + ); +} + +sub bind_queue { + my ($self, $args,) = @_; + + return $self->_post_and_read( + 'Queue::Bind', + { + %$args, # queue, exchange, routing_key + ticket => 0, + nowait => 0, + }, + 'Queue::BindOk', + 1, + ); +} + +sub purge_queue { + my ($self, $args,) = @_; + + return $self->_post_and_read( + 'Queue::Purge', + { + %$args, # queue + ticket => 0, + nowait => 0, + }, + 'Queue::PurgeOk', + 1, + ); +} + +sub delete_queue { + my ($self, $args,) = @_; + + return $self->_post_and_read( + 'Queue::Delete', + { + if_unused => 0, + if_empty => 0, + %$args, # queue + ticket => 0, + nowait => 0, + }, + 'Queue::DeleteOk', + 1, + ); +} + +sub publish { + my ($self, $publish_args, $header_args, $message,) = @_; + + $self->_publish( + $publish_args, + )->_header( + $header_args, $message, + )->_body( + $message, + ); + + return if !$publish_args->{mandatory} && !$publish_args->{immediate}; + + my ($frame) = $self->_read; + return $frame; +} + +sub _publish { + my ($self, $args,) = @_; + + $self->_post( + Net::AMQP::Protocol::Basic::Publish->new( + exchange => '', + mandatory => 0, + immediate => 0, + %$args, # routing_key + ticket => 0, + ), + 1, + ); + + return $self; +} + +sub _header { + my ($self, $args, $message,) = @_; + + $self->_post( + Net::AMQP::Frame::Header->new( + weight => $args->{weight} || 0, + body_size => length($message), + header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new( + content_type => 'application/octet-stream', + content_encoding => '', + headers => {}, + delivery_mode => 1, + priority => 1, + correlation_id => '', + reply_to => '', + expiration => '', + message_id => '', + timestamp => time, + type => '', + user_id => '', + app_id => '', + cluster_id => '', + %$args, + ), + ), + 1, + ); + + return $self; +} + +sub _body { + my ($self, $message,) = @_; + $self->_post(Net::AMQP::Frame::Body->new(payload => $message), 1); + return $self; +} + +sub consume { + my ($self, $args,) = @_; + + die 'Has already been consuming message', "\n" if $self->_consume_tag; + + my $frame = $self->_post_and_read( + 'Basic::Consume', + { + consumer_tag => '', + no_local => 0, + no_ack => 1, + exclusive => 0, + %$args, # queue + ticket => 0, + nowait => 0, + }, + 'Basic::ConsumeOk', + 1, + ); + + $self->_consume_tag($frame->method_frame->consumer_tag); + return $frame; +} + +sub cancel { + my ($self,) = @_; + + return if !$self->_consume_tag; + + my $frame = $self->_post_and_read( + 'Basic::Cancel', + { + consumer_tag => $self->_consume_tag, + nowait => 0, + }, + 'Basic::CancelOk', + 1, + ); + + $self->_consume_tag(''); + return $frame; +} + +sub poll { + my ($self, $args) = @_; + + my $timeout = $args && $args->{timeout} ? $args->{timeout} : 'infinite'; + $self->_read_and_valid('Basic::Deliver', $timeout); + + return { + header => $self->_read_header_and_valid(), + body => $self->_read_body_and_valid(), + }; +} + +sub _post_and_read { + my ($self, $method, $args, $exp, $id,) = @_; + + $method = 'Net::AMQP::Protocol::' . $method; + $self->_post( + Net::AMQP::Frame::Method->new( + method_frame => $method->new(%$args) + ), + $id, + ); + return $self->_read_and_valid($exp); +} + +sub _read_and_valid { + my ($self, $exp, $timeout,) = @_; + + my ($frame) = $self->_read($timeout); + die 'Received data is not method frame', "\n" + if !$frame->isa('Net::AMQP::Frame::Method'); + + my $method_frame = $frame->method_frame; + return $frame if $method_frame->isa('Net::AMQP::Protocol::' . $exp); + + $self->_check_close_and_clean($frame); + die 'Method is not ', $exp, "\n", 'Method was ', ref $method_frame, "\n" + if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close'); +} + +sub _read_header_and_valid { + my ($self,) = @_; + + my ($frame) = $self->_read(); + if (!$frame->isa('Net::AMQP::Frame::Header')) { + $self->_check_close_and_cleanup($frame); + die 'Received data is not header frame', "\n"; + } + + my $header_frame = $frame->header_frame; + die 'Header is not Protocol::Basic::ContentHeader', + 'Header was ', ref $header_frame, "\n" + if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader'); + + return $frame; +} + +sub _read_body_and_valid { + my ($self,) = @_; + + my ($frame) = $self->_read(); + return $frame if $frame->isa('Net::AMQP::Frame::Body'); + + $self->_check_close_and_cleanup($frame); + die 'Received data is not body frame', "\n"; +} + +sub _check_close_and_clean { + my ($self, $frame,) = @_; + + return $self if !$frame->isa('Net::AMQP::Frame::Method'); + + my $method_frame = $frame->method_frame; + + if ($method_frame->isa('Net::AMQP::Protocol::Connection::Close')) { + $self->_is_oepn_channel(0); + $self->_post(Net::AMQP::Protocol::Connection::CloseOk->new()); + $self->_is_open(0); + $self->_disconnect(); + die $method_frame->reply_code, ' ', $method_frame->reply_text, "\n"; + } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) { + $self->_post(Net::AMQP::Protocol::Channel::CloseOk->new(), 1); + $self->_is_oepn_channel(0); + $self->_close()->_disconnect(); + die $method_frame->reply_code, ' ', $method_frame->reply_text, "\n"; + } + + return $self; +} + +sub _read { + my ($self, $timeout,) = @_; + + $timeout ||= $self->timeout; + + my @frames; + + if ($timeout eq 'infinite') { + @frames = $self->_do_read(); + } else { + if (timeout_call($timeout, sub {@frames = $self->_do_read()})) { + die 'Read timed out after', $timeout, "\n"; + } + } + + return @frames; +} + +sub _do_read { + my ($self,) = @_; + + my $stack; + my $data; + + read $self->_socket, $data, 8; + if (length($data) <= 0) { + die 'Disconnect', "\n"; + } + + $stack .= $data; + + my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, ''; + if (!defined $type_id || !defined $channel || !defined $length) { + die 'Broken data was received', "\n"; + } + + while ($length > 0) { + $length -= read $self->_socket, $data, $length; + $stack .= $data; + } + + my @frames = Net::AMQP->parse_raw_frames(\$stack); + if ($self->verbose) { + print STDERR '[C] <-- [S] ' . Dumper(\@frames); + print STDERR '-----------', "\n"; + } + + return @frames; +} + +sub _post { + my ($self, $output, $id,) = @_; + + if ($output->isa('Net::AMQP::Protocol::Base')) { + $output = $output->frame_wrap; + } + $output->channel($id || 0); + + if ($self->verbose) { + print STDERR '[C] --> [S] ', Dumper($output), "\n"; + } + print {$self->_socket} $output->to_raw_frame(); + + return; +} + +sub DEMOLISH { + my ($self) = @_; + + $self->close(); + return; +} + +1; +__END__ + +=head1 NAME + +RabbitFoot - A synchronous and single channel Perl AMQP client. + +=head1 SYNOPSIS + + use RabbitFoot; + + my $rf = RabbitFoot->new({ + verbose => $self->verbose, + timeout => 1, + })->load_xml_spec( + '/path/to/amqp0-8.xml', + )->connect({ + host => 'localhosti', + port => 5672, + user => 'guest', + port => 'guest', + vhost => '/', + }); + + $rf->declare_exchange({exchange => 'test_exchange'}); + +=head1 DESCRIPTION + +RabbitFoot is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a synchronous fashion. + +You can use RabbitFoot to - + + * Declare and delete exchanges + * Declare, delete and bind queues + * Publish and consume messages + +RabbitFoot is known to work with RabbitMQ versions 1.7.0 and version 0-8 of the AMQP specification. + +=head1 AUTHOR + +Masahito Ikuta Ecooldaemon@gmail.comE + +=head1 SEE ALSO + +=head1 LICENSE + +This library is free software; you can redistribute it and/or modify +it under the same terms as Perl itself. + +=cut diff --git a/lib/RabbitFoot/Cmd.pm b/lib/RabbitFoot/Cmd.pm new file mode 100644 index 0000000..293a39f --- /dev/null +++ b/lib/RabbitFoot/Cmd.pm @@ -0,0 +1,9 @@ +package RabbitFoot::Cmd; + +use Moose; +extends qw(MooseX::App::Cmd); +__PACKAGE__->meta->make_immutable; +no Moose; + +1; + diff --git a/lib/RabbitFoot/Cmd/Command/bind_queue.pm b/lib/RabbitFoot/Cmd/Command/bind_queue.pm new file mode 100644 index 0000000..9763832 --- /dev/null +++ b/lib/RabbitFoot/Cmd/Command/bind_queue.pm @@ -0,0 +1,74 @@ +package RabbitFoot::Cmd::Command::bind_queue; + +use Moose; +extends qw(MooseX::App::Cmd::Command); +with qw(RabbitFoot::Cmd::Role::Config RabbitFoot::Cmd::Role::Command); + +has queue => ( + isa => 'Str', + is => 'rw', + required => 1, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'q', + documentation => 'queue name', +); + +has exchange => ( + isa => 'Str', + is => 'rw', + required => 1, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'e', + documentation => 'exchange name', +); + +has routing_key => ( + isa => 'Str', + is => 'rw', + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'r', + documentation => 'message routing key', +); + +__PACKAGE__->meta->make_immutable; +no Moose; + +sub abstract { + return 'bind queue to an exchange'; +} + +sub _validate_queue { + my ($self,) = @_; + + $self->_check_queue(); + return; +} + +sub _validate_exchange { + my ($self,) = @_; + + $self->_check_shortstr('exchange'); + return; +} + +sub _validate_routing_key { + my ($self,) = @_; + + return if !$self->routing_key; + die 'routing_key', "\n" if 255 < length($self->routing_key); + return; +} + +sub _run { + my ($self, $client, $opt, $args,) = @_; + + my $method_frame = $client->bind_queue({ + (map {$_ => $self->$_} qw(queue exchange routing_key)) + })->method_frame; + + print 'Bound queue to exchange', "\n"; + return; +} + +1; + diff --git a/lib/RabbitFoot/Cmd/Command/declare_exchange.pm b/lib/RabbitFoot/Cmd/Command/declare_exchange.pm new file mode 100644 index 0000000..35ce128 --- /dev/null +++ b/lib/RabbitFoot/Cmd/Command/declare_exchange.pm @@ -0,0 +1,98 @@ +package RabbitFoot::Cmd::Command::declare_exchange; + +use List::MoreUtils qw(none); +use Moose; + +extends qw(MooseX::App::Cmd::Command); +with qw(RabbitFoot::Cmd::Role::Config RabbitFoot::Cmd::Role::Command); + +has exchange => ( + isa => 'Str', + is => 'rw', + default => '', + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'e', + documentation => 'exchange name', +); + +has type => ( + isa => 'Str', + is => 'rw', + default => 'direct', + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 't', + documentation => 'exchange type [direct|topic|fanout|headers]', +); + +has passive => ( + isa => 'Bool', + is => 'rw', + default => 0, + metaclass => 'MooseX::Getopt::Meta::Attribute', +# cmd_aliases => 'p', + documentation => 'do not create exchange', +); + +has durable => ( + isa => 'Bool', + is => 'rw', + default => 0, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'd', + documentation => 'request a durable exchange', +); + +has auto_delete => ( + isa => 'Bool', + is => 'rw', + default => 0, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'a', + documentation => 'auto delete exchange when unused', +); + +has internal => ( + isa => 'Bool', + is => 'rw', + default => 0, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'i', + documentation => 'create internal exchange', +); + +__PACKAGE__->meta->make_immutable; +no Moose; + +sub abstract { + return 'declare exchange, create if needed'; +} + +sub _validate_exchange { + my ($self,) = @_; + + return if !$self->exchange; + $self->_check_shortstr('exchange'); + return; +} + +sub _validate_type { + my ($self,) = @_; + + die 'type', "\n" + if none {$_ eq $self->type} qw(direct topic fanout headers); + return; +} + +sub _run { + my ($self, $client, $opt, $args,) = @_; + + my $method_frame = $client->declare_exchange({ + (map {$_ => $self->$_} qw(exchange type passive durable auto_delete internal)) + })->method_frame; + + print 'Declared exchange', "\n"; + return; +} + +1; + diff --git a/lib/RabbitFoot/Cmd/Command/declare_queue.pm b/lib/RabbitFoot/Cmd/Command/declare_queue.pm new file mode 100644 index 0000000..67f658e --- /dev/null +++ b/lib/RabbitFoot/Cmd/Command/declare_queue.pm @@ -0,0 +1,82 @@ +package RabbitFoot::Cmd::Command::declare_queue; + +use Moose; +extends qw(MooseX::App::Cmd::Command); +with qw(RabbitFoot::Cmd::Role::Config RabbitFoot::Cmd::Role::Command); + +has queue => ( + isa => 'Str', + is => 'rw', + default => '', + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'q', + documentation => 'queue name', +); + +has passive => ( + isa => 'Bool', + is => 'rw', + default => 0, + metaclass => 'MooseX::Getopt::Meta::Attribute', +# cmd_aliases => 'p', + documentation => 'do not create queue', +); + +has durable => ( + isa => 'Bool', + is => 'rw', + default => 0, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'd', + documentation => 'request a durable queue', +); + +has exclusive => ( + isa => 'Bool', + is => 'rw', + default => 0, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'e', + documentation => 'request an exclusive queue', +); + +has auto_delete => ( + isa => 'Bool', + is => 'rw', + default => 0, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'a', + documentation => 'auto delete queue when unused', +); + +__PACKAGE__->meta->make_immutable; +no Moose; + +sub abstract { + return 'declare queue, create if needed'; +} + +sub _validate_queue { + my ($self,) = @_; + + return if !$self->queue; + $self->_check_queue(); + return; +} + +sub _run { + my ($self, $client, $opt, $args,) = @_; + + my $method_frame = $client->declare_queue({ + (map {$_ => $self->$_} qw(queue passive durable exclusive auto_delete)) + })->method_frame; + + print 'Declared queue', "\n"; + for my $method (qw(queue message_count consumer_count)) { + print $method, ': ', $method_frame->$method, "\n"; + } + return; +} + +1; + diff --git a/lib/RabbitFoot/Cmd/Command/delete_exchange.pm b/lib/RabbitFoot/Cmd/Command/delete_exchange.pm new file mode 100644 index 0000000..ee2a09d --- /dev/null +++ b/lib/RabbitFoot/Cmd/Command/delete_exchange.pm @@ -0,0 +1,51 @@ +package RabbitFoot::Cmd::Command::delete_exchange; + +use Moose; +extends qw(MooseX::App::Cmd::Command); +with qw(RabbitFoot::Cmd::Role::Config RabbitFoot::Cmd::Role::Command); + +has exchange => ( + isa => 'Str', + is => 'rw', + required => 1, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'q', + documentation => 'exchange name', +); + +has if_unused => ( + isa => 'Bool', + is => 'rw', + default => 0, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'U', + documentation => 'delete only if unused', +); + +__PACKAGE__->meta->make_immutable; +no Moose; + +sub abstract { + return 'delete a exchange'; +} + +sub _validate_exchange { + my ($self,) = @_; + + $self->_check_shortstr('exchange'); + return; +} + +sub _run { + my ($self, $client, $opt, $args,) = @_; + + my $method_frame = $client->delete_exchange({ + (map {$_ => $self->$_} qw(exchange if_unused)) + })->method_frame; + + print 'Deleted exchange', "\n"; + return; +} + +1; + diff --git a/lib/RabbitFoot/Cmd/Command/delete_queue.pm b/lib/RabbitFoot/Cmd/Command/delete_queue.pm new file mode 100644 index 0000000..124c713 --- /dev/null +++ b/lib/RabbitFoot/Cmd/Command/delete_queue.pm @@ -0,0 +1,61 @@ +package RabbitFoot::Cmd::Command::delete_queue; + +use Moose; +extends qw(MooseX::App::Cmd::Command); +with qw(RabbitFoot::Cmd::Role::Config RabbitFoot::Cmd::Role::Command); + +has queue => ( + isa => 'Str', + is => 'rw', + required => 1, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'q', + documentation => 'queue name', +); + +has if_unused => ( + isa => 'Bool', + is => 'rw', + default => 0, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'U', + documentation => 'delete only if unused', +); + +has if_empty => ( + isa => 'Bool', + is => 'rw', + default => 0, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'e', + documentation => 'delete only if empty', +); + +__PACKAGE__->meta->make_immutable; +no Moose; + +sub abstract { + return 'delete a queue'; +} + +sub _validate_queue { + my ($self,) = @_; + + $self->_check_queue(); + return; +} + +sub _run { + my ($self, $client, $opt, $args,) = @_; + + my $method_frame = $client->delete_queue({ + (map {$_ => $self->$_} qw(queue if_unused if_empty)) + })->method_frame; + + print 'Deleted queue', "\n"; + print 'message_count: ', $method_frame->message_count, "\n"; + return; +} + +1; + diff --git a/lib/RabbitFoot/Cmd/Command/purge_queue.pm b/lib/RabbitFoot/Cmd/Command/purge_queue.pm new file mode 100644 index 0000000..e681399 --- /dev/null +++ b/lib/RabbitFoot/Cmd/Command/purge_queue.pm @@ -0,0 +1,41 @@ +package RabbitFoot::Cmd::Command::purge_queue; + +use Moose; +extends qw(MooseX::App::Cmd::Command); +with qw(RabbitFoot::Cmd::Role::Config RabbitFoot::Cmd::Role::Command); + +has queue => ( + isa => 'Str', + is => 'rw', + required => 1, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'q', + documentation => 'queue name', +); + +__PACKAGE__->meta->make_immutable; +no Moose; + +sub abstract { + return 'purge a queue'; +} + +sub _validate_queue { + my ($self,) = @_; + + $self->_check_queue(); + return; +} + +sub _run { + my ($self, $client, $opt, $args,) = @_; + + my $method_frame = $client->purge_queue({queue => $self->queue})->method_frame; + + print 'Purged queue', "\n"; + print 'message_count: ', $method_frame->message_count, "\n"; + return; +} + +1; + diff --git a/lib/RabbitFoot/Cmd/Role/Command.pm b/lib/RabbitFoot/Cmd/Role/Command.pm new file mode 100644 index 0000000..fc9ec91 --- /dev/null +++ b/lib/RabbitFoot/Cmd/Role/Command.pm @@ -0,0 +1,131 @@ +package RabbitFoot::Cmd::Role::Command; + +use FindBin; +use RabbitFoot; + +use Moose::Role; +requires qw(_run); + +has spec => ( + isa => 'Str', + is => 'rw', + default => $FindBin::Bin . '/amqp0-8.xml', + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 's', + documentation => 'AMQP specification', +); + +has host => ( + isa => 'Str', + is => 'rw', + default => 'localhost', + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'H', + documentation => 'host name or ip address', +); + +has port => ( + isa => 'Int', + is => 'rw', + default => 5672, + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'P', + documentation => 'port number', +); + +has user => ( + isa => 'Str', + is => 'rw', + default => 'guest', + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'u', + documentation => 'user name', +); + +has pass => ( + isa => 'Str', + is => 'rw', + default => 'guest', + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'p', + documentation => 'password', +); + +has vhost => ( + isa => 'Str', + is => 'rw', + default => '/', + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'v', + documentation => 'virtual host', +); + +has verbose => ( + isa => 'Bool', + is => 'rw', + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'V', + documentation => 'Verbose mode', +); + +no Moose::Role; + +sub validate_args { + my ($self, $opt, $args) = @_; + + for my $method ($self->meta->get_all_methods) { + next if $method->name !~ /^_validate_/; + eval {$method->execute($self);}; + die $self->usage_error($@) if $@; + } +} + +sub _validate_spec { + my ($self,) = @_; + + die 'spec', "\n" if !-f $self->spec; +} + +sub _validate_vhost { + my ($self,) = @_; + + die 'vhost', "\n" + if 255 < length($self->vhost) + || $self->vhost !~ m{^[a-zA-Z0-9/\-_]+$}; +} + +sub _check_queue { + my ($self,) = @_; + + die 'queue', "\n" + if 255 < length($self->queue) + || $self->queue !~ m{^[a-zA-Z0-9/\-_.:=+]+$}; +} + +sub _check_shortstr { + my ($self, $arg,) = @_; + + die $arg, "\n" + if 255 < length($self->$arg) + || $self->$arg !~ m{^[a-zA-Z0-9-_.:]+$}; +} + +sub execute { + my $self = shift; + my ($opt, $args,) = @_; + + my $client = RabbitFoot->new({ + verbose => $self->verbose, + timeout => 1, + })->load_xml_spec( + $self->spec, + )->connect({ + (map {$_ => $self->$_} qw(host port user pass vhost)) + }); + + $self->_run($client, @_,); + return; +} + +1; + diff --git a/lib/RabbitFoot/Cmd/Role/Config.pm b/lib/RabbitFoot/Cmd/Role/Config.pm new file mode 100644 index 0000000..c90816e --- /dev/null +++ b/lib/RabbitFoot/Cmd/Role/Config.pm @@ -0,0 +1,35 @@ +package RabbitFoot::Cmd::Role::Config; + +use FindBin; +use Config::Any; + +use Moose::Role; +with qw(MooseX::ConfigFromFile); + +has configfile => ( + isa => 'Str', + is => 'rw', + default => $FindBin::Bin . '/config.json', + metaclass => 'MooseX::Getopt::Meta::Attribute', + cmd_aliases => 'c', + documentation => 'config file', +); + +no Moose::Role; + +sub get_config_from_file { + my ($self, $file,) = @_; + + return {} if !$file || !-f $file; + + my $config = Config::Any->load_files({ + files => [$file], + use_ext => 1, + driver_args => {General => {-LowerCaseNames => 1}} + }); + + return $config->[0]->{$file} or die "Could not load $file"; +} + +1; + diff --git a/rabbit_foot b/rabbit_foot new file mode 100755 index 0000000..dd622c1 --- /dev/null +++ b/rabbit_foot @@ -0,0 +1,11 @@ +#!/usr/bin/env perl + +use strict; +use warnings; + +use FindBin; +use lib ($FindBin::Bin . '/lib'); + +use RabbitFoot::Cmd; +RabbitFoot::Cmd->run; + diff --git a/t/00_compile.t b/t/00_compile.t new file mode 100644 index 0000000..fcda7d3 --- /dev/null +++ b/t/00_compile.t @@ -0,0 +1,13 @@ +use strict; +use Test::More tests => 8; + +BEGIN { + use_ok 'RabbitFoot'; + use_ok 'RabbitFoot::Cmd'; + use_ok 'RabbitFoot::Cmd::Role::Command'; + use_ok 'RabbitFoot::Cmd::Role::Config'; + use_ok 'RabbitFoot::Cmd::Command::declare_queue'; + use_ok 'RabbitFoot::Cmd::Command::bind_queue'; + use_ok 'RabbitFoot::Cmd::Command::purge_queue'; + use_ok 'RabbitFoot::Cmd::Command::declare_exchange'; +} diff --git a/xt/01_podspell.t b/xt/01_podspell.t new file mode 100644 index 0000000..babf41f --- /dev/null +++ b/xt/01_podspell.t @@ -0,0 +1,13 @@ +use Test::More; +eval q{ use Test::Spelling }; +plan skip_all => "Test::Spelling is not installed." if $@; +add_stopwords(map { split /[\s\:\-]/ } ); +set_spell_cmd('aspell list'); +$ENV{LANG} = 'C'; +all_pod_files_spelling_ok('lib'); +__DATA__ +Masahito Ikuta +cooldaemon@gmail.com +RabbitFoot +AMQP +RabbitMQ diff --git a/xt/02_perlcritic.t b/xt/02_perlcritic.t new file mode 100644 index 0000000..b977df8 --- /dev/null +++ b/xt/02_perlcritic.t @@ -0,0 +1,8 @@ +use strict; +use Test::More; +eval { + require Test::Perl::Critic; + Test::Perl::Critic->import( -profile => 'xt/perlcriticrc'); +}; +plan skip_all => "Test::Perl::Critic is not installed." if $@; +all_critic_ok('lib'); diff --git a/xt/03_pod.t b/xt/03_pod.t new file mode 100644 index 0000000..437887a --- /dev/null +++ b/xt/03_pod.t @@ -0,0 +1,4 @@ +use Test::More; +eval "use Test::Pod 1.00"; +plan skip_all => "Test::Pod 1.00 required for testing POD" if $@; +all_pod_files_ok(); diff --git a/xt/04_use_server.t b/xt/04_use_server.t new file mode 100644 index 0000000..7522a82 --- /dev/null +++ b/xt/04_use_server.t @@ -0,0 +1,92 @@ +use Test::More; +use Test::Exception; + +use FindBin; +use JSON::Syck; + +my $conf = JSON::Syck::LoadFile($FindBin::Bin . '/../config.json'); + +eval { + use IO::Socket::INET; + + my $socket = IO::Socket::INET->new( + Proto => 'tcp', + PeerAddr => $conf->{host}, + PeerPort => $conf->{port}, + Timeout => 1, + ) or die 'Error connecting to AMQP Server!'; + + close $socket; +}; + +plan skip_all => 'Connection failure: ' + . $conf->{host} . ':' . $conf->{port} if $@; +plan tests => 13; + +use RabbitFoot; + +my $rf = RabbitFoot->new({timeout => 1, verbose => 1,}); + +lives_ok sub { + $rf->load_xml_spec($FindBin::Bin . '/../amqp0-8.xml') +}, 'load xml spec'; + +lives_ok sub { + $rf->connect({(map {$_ => $conf->{$_}} qw(host port user pass vhost))}); +}, 'connect'; + +lives_ok sub { + $rf->declare_exchange({exchange => 'test_x'}); +}, 'declare_exchange'; + +lives_ok sub { + $rf->declare_queue({queue => 'test_q'}); +}, 'declare_queue'; + +lives_ok sub { + $rf->bind_queue({ + queue => 'test_q', + exchange => 'test_x', + routing_key => 'test_r', + }); +}, 'bind_queue'; + +lives_ok sub { + $rf->publish( + { + exchange => 'test_x', + routing_key => 'test_r', + }, + {}, + 'Hello RabbitMQ.' + ); +}, 'publish'; + +lives_ok sub { + $rf->consume({queue => 'test_q'}); +}, 'consume'; + +lives_ok sub { + $rf->poll({timeout => 1}); +}, 'poll'; + +lives_ok sub { + $rf->cancel(); +}, 'cancel'; + +lives_ok sub { + $rf->purge_queue({queue => 'test_q'}); +}, 'purge_queue'; + +lives_ok sub { + $rf->delete_queue({queue => 'test_q'}); +}, 'delete_queue'; + +lives_ok sub { + $rf->delete_exchange({exchange => 'test_x'}); +}, 'delete_exchange'; + +lives_ok sub { + $rf->close(); +}, 'close'; + diff --git a/xt/perlcriticrc b/xt/perlcriticrc new file mode 100644 index 0000000..75419ce --- /dev/null +++ b/xt/perlcriticrc @@ -0,0 +1,8 @@ +[TestingAndDebugging::ProhibitNoStrict] +allow=refs + +[TestingAndDebugging::RequireUseStrict] +equivalent_modules = Any::Moose Moose Mouse + +[TestingAndDebugging::RequireUseWarnings] +equivalent_modules = Any::Moose Moose Mouse