Permalink
Browse files

First commit

  • Loading branch information...
0 parents commit 60852d1e9e03010edf8b22bf75e860837792e21e Jonathan Stoppani committed Aug 22, 2009
Showing with 596 additions and 0 deletions.
  1. +2 −0 .gitignore
  2. +6 −0 INSTALL
  3. +19 −0 LICENSE
  4. +19 −0 README
  5. 0 amqplib_thrift/__init__.py
  6. +95 −0 amqplib_thrift/factories.py
  7. +132 −0 amqplib_thrift/transports.py
  8. +49 −0 example/client.py
  9. +65 −0 example/server.py
  10. +36 −0 example/shared.thrift
  11. +152 −0 example/tutorial.thrift
  12. +21 −0 setup.py
@@ -0,0 +1,2 @@
+*.pyc
+gen-py
@@ -0,0 +1,6 @@
+INSTALLATION
+============
+
+From the command-line run:
+
+ python setup.py install
19 LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2009 Jonathan Stoppani
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
19 README
@@ -0,0 +1,19 @@
+README
+======
+
+This library allows the creation of thrift clients and servers using AMQP
+with the pyamqplib for the communication.
+
+For installation instructions see the INSTALL file in this directory.
+
+For more information about thrift, visit
+
+ http://incubator.apache.org/thrift/
+
+For more information about pyamqplib, visit:
+
+ http://code.google.com/p/py-amqplib/
+
+For more information about AMQP, visit
+
+ http://www.amqp.org
No changes.
@@ -0,0 +1,95 @@
+from amqplib_thrift.transports import TAMQServerTransport, TAMQTransport
+from thrift.protocol.TBinaryProtocol import TBinaryProtocol, \
+ TBinaryProtocolFactory
+from thrift.server.TServer import TSimpleServer
+from thrift.transport.TTransport import TTransportFactoryBase, TMemoryBuffer
+
+class TAMQInputTransportFactory(TTransportFactoryBase):
+ """
+ Input transport factory to be used with a TServer instance.
+ """
+
+ def getTransport(self, msg):
+ """
+ Creates and returns a TMemoryBuffer instance with the content of
+ the amqp.Message instance passed as argument.
+ """
+ return TMemoryBuffer(msg.body)
+
+class TAMQOutputTransportFactory(TTransportFactoryBase):
+ """
+ Output transport factory to be used with a TServer instance.
+ """
+
+ def __init__(self, channel, exchange):
+ """
+ Creates a new output factory instance for the given channel and
+ exchange.
+ """
+ self.channel = channel
+ self.exchange = exchange
+
+ def getTransport(self, msg):
+ """
+ Creates and return a new TAMQTransport instance using the `reply_to`
+ property of the amqp.Message argument to define the routing_key for
+ the response.
+ """
+ routing_key = msg.properties['reply_to']
+ return TAMQTransport(self.channel, self.exchange, routing_key)
+
+class TAMQFactory(object):
+ """
+ Factory for AMQP based thrift clients and servers.
+ """
+
+ def __init__(self, channel, services_exchange='services',
+ responses_exchange='responses', protocol_factory=None):
+ """
+ Creates a new factory operating over the given channel, sending
+ messages to the given services and responses exchanges and using
+ the protocols created by the given protocol factory.
+ """
+ self.channel = channel
+ self.services_exchange = services_exchange
+ self.responses_exchange = responses_exchange
+
+ self.protocol_factory = protocol_factory or TBinaryProtocolFactory()
+
+ self.channel.exchange_declare(self.services_exchange, "direct")
+ self.channel.exchange_declare(self.responses_exchange, "direct")
+
+ def get_client(self, client_class, routing_key):
+ """
+ Creates a new client for the given class sending messages to the
+ hanlder with the given routing_key.
+ """
+ queue, _, _ = self.channel.queue_declare(exclusive=True,
+ auto_delete=True)
+ self.channel.queue_bind(queue, self.responses_exchange, queue)
+
+ transport = TAMQTransport(self.channel, self.services_exchange,
+ routing_key, queue)
+
+ return client_class(self.protocol_factory.getProtocol(transport))
+
+ def get_server(self, processor, routing_key, server_class=None):
+ """
+ Creates a new server for the given processor, using the `server_class`
+ class (defaults to `TSimpleServer`) and listening to messages with the
+ given routing key sent to the `services_exchange`
+ """
+ queue, _, _ = self.channel.queue_declare(exclusive=True,
+ auto_delete=True)
+ self.channel.queue_bind(queue, self.services_exchange, routing_key)
+
+ server_class = server_class or TSimpleServer
+
+ server_transport = TAMQServerTransport(self.channel, queue)
+
+ itrans_factory = TAMQInputTransportFactory()
+ otrans_factory = TAMQOutputTransportFactory(self.channel,
+ self.responses_exchange)
+
+ return server_class(processor, server_transport, itrans_factory,
+ otrans_factory, self.protocol_factory, self.protocol_factory)
@@ -0,0 +1,132 @@
+from cStringIO import StringIO
+from amqplib import client_0_8 as amqp
+from thrift.transport.TTransport import TTransportBase, TServerTransportBase
+
+class TAMQServerTransport(TServerTransportBase):
+ """
+ An implementation of TServerTransportBase to support AMQP messaging.
+ """
+
+ def __init__(self, channel, queue):
+ """
+ Creates a new AMQP ServerTransport for the given channel and listening
+ on the given queue.
+ """
+ self.channel = channel
+ self.queue = queue
+ self.msg = None
+
+ def listen(self):
+ """
+ Starts listening.
+ """
+ self.channel.basic_consume(self.queue, callback=self.incomingMessage,
+ no_ack=True)
+
+ def accept(self):
+ """
+ Waits for a message to be dispatched to the queue and returns it.
+ If a message was already received, returns immediately.
+ """
+ if not self.msg:
+ self.channel.wait()
+
+ msg = self.msg
+ self.msg = None
+
+ return msg
+
+ def incomingMessage(self, msg):
+ """
+ Callback for the `basic_consume` method. Sets the `msg` istance
+ property to the received amqp.Message instance.
+ """
+ self.msg = msg
+
+class TAMQTransport(TTransportBase):
+ """
+ AMQP transport implementation.
+ """
+
+ def __init__(self, channel, exchange, routing_key, queue=None):
+ """
+ Creates a new TAMQTransport instance.
+
+ @param channel: The AMQP channel to use for the client-server
+ communication
+ @type channel: an L{amqp.Channel}
+
+ @param exchange: The name of the exchange to which send messages.
+ @type exchange: a C{string}
+
+ @param routing_key: The rouring_key to use for outgoing messages.
+ @type routing_key: a C{string}
+
+ @param queue: The queue on which to listen when used for clients.
+ This is the response queue for non `oneway` requests.
+ If setted, the `reply_to` property of outgoing messages
+ will be setted to the queue name.
+ @type queue: a C{string}
+ """
+
+ # Input and output buffers
+ self.__wbuf = StringIO()
+ self.__rbuf = None
+
+ self.channel = channel
+ self.exchange = exchange
+ self.routing_key = routing_key
+ self.reply_to = queue
+
+ if queue:
+ channel.basic_consume(queue, callback=self._incoming_message,
+ no_ack=True)
+
+ def read(self, sz):
+ """
+ Read sz bytes from the input buffer.
+ If the input buffer is empty, wait for data.
+ """
+ if not self.__rbuf:
+ self.channel.wait()
+
+ chunk = self.__rbuf.read(sz)
+
+ if not len(chunk):
+ # Buffer is empty, time to listen for the next message
+ self.__rbuf = None
+ return self.read(sz)
+
+ return chunk
+
+ def write(self, buf):
+ """
+ Writes some data to the output buffer.
+ """
+ self.__wbuf.write(buf)
+
+ def flush(self):
+ """
+ Sends the message and clears the output buffer.
+ """
+ message = self.__wbuf.getvalue()
+ self.__wbuf = StringIO()
+
+ kwargs = {'application_headers': {
+ 'thriftClientName' : self.routing_key
+ }}
+
+ if self.reply_to:
+ kwargs['reply_to'] = self.reply_to
+
+ msg = amqp.Message(message, **kwargs)
+
+ self.channel.basic_publish(msg, self.exchange, self.routing_key)
+
+ def _incoming_message(self, msg):
+ """
+ Callback for incoming messages.
+ """
+ self.__rbuf = StringIO(msg.body)
+
+
@@ -0,0 +1,49 @@
+import sys
+import os
+from amqplib_thrift.factories import TAMQFactory
+from amqplib import client_0_8 as amqp
+
+# Set up sys.path to include thrift services
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'gen-py'))
+
+# Thrift services
+from tutorial import Calculator
+from tutorial.ttypes import Work, Operation, InvalidOperation
+
+# Set up connection
+connection = amqp.Connection(host="10.0.0.100")
+channel = connection.channel()
+
+# Set up factory
+factory = TAMQFactory(channel)
+
+# Get the client
+calculator = factory.get_client(Calculator.Client, 'calculator')
+
+# Calculate something
+calculator.ping()
+
+print "4 + 5 = ", calculator.add(4, 5)
+
+w = Work(num1=20, num2=10, op=Operation.MULTIPLY)
+print "20 * 10 = ", calculator.calculate(1, w)
+
+w = Work(num1=2, num2=0, op=Operation.DIVIDE)
+try:
+ result = calculator.calculate(2, w)
+except InvalidOperation, e:
+ print "Operation %d failed with message '%s'" % (e.what, e.why)
+else:
+ print "2 / 0 = ", result
+
+print "Zipping...",
+calculator.zip()
+print "done"
+
+print "Pinging...",
+calculator.ping()
+print "done"
+
+# Tear down the connection
+channel.close()
+connection.close()
Oops, something went wrong.

0 comments on commit 60852d1

Please sign in to comment.