Skip to content

baguette-io/baguette-messaging

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

baguette-messaging

image

Tiny Framework to build micro services. Currently only support amqp, rpc over amqp and http stream based micro services.

How it works

AMQP

We declare a publisher:
the method decorated takes one parameter publish (farine.amqp.publisher.Publisher)
import farine.amqp

class Publish(object):

    @farine.amqp.publish(exchange='consume', routing_key='routing_key')
    def publish_dummy(self, publish):
            """
            :param publish: Send the data through AMQP.
            :type publish: farine.amqp.publisher.Publisher
        """
        publish({'result':0})
And then a consumer :
the method decorated takes two parameters body (dict) and message (kombu.message.Message).
import farine.amqp

class Consume(object):

    @farine.amqp.consume(exchange='publish', routing_key='routing_key')
    def consume_dummy(self, body, message):
            """
            :param body: The message's data.
            :type body: dict
            :param message: Message class.
            :type message: kombu.message.Message
            """
        message.ack()

RPC over AMQP

We need declare two services :
the server : Wait for a call(consumer), and answer(publisher).
The method decorated just takes args and kwargs
import farine.rpc

class Server(object):

    @farine.rpc.method()
    def dummy(self, *args, **kwargs):
        return True
And the client : Send a call(publisher), and wait for an answer(consumer).
the method decorated takes one argument rpc (farine.rpc.client.Client).
The result will be a dictionnary.
import farine.rpc

class Client(object):

        @farine.rpc.client('myotherservice')
    def dummy(self, rpc):
            """
            :param rpc: The RPC client.
            :type rpc: farine.rpc.client.Client
            """
        result = rpc.dummy()

RPC Stream

We can also do streaming RPC call.
All you need to do is to add __stream__ = True* to your RPC call.
Also, a generator is returned.

Example:

import farine.rpc

class Server(object):

    @farine.rpc.method()
    def dummy(self, *args, **kwargs):
        yield 'a'
        yield 'b'
import farine.rpc

class Client(object):

    @farine.rpc.client('myotherservice')
        def dummy(self, rpc):
            """
            :param rpc: The RPC client.
            :type rpc: farine.rpc.client.Client
            """
        for result in rpc.dummy(__stream__=True):
                print result

HTTP Stream

We can declare a service that will listen to an HTTP SSE event :
the method decorated takes one argument data (dict).
In the configuration file, we just need to setup the endpoint to listen.
import farine.stream

class Client(object):

    @farine.stream.http()
    def listen_event(self, data):
            """
            :param data: The event sent.
            :type data: dict
            """
        return True

Execute method

If all we need is to execute a method, we can use farine.execute.method.
It will restart it, if it ends (customizable).
import farine.execute

class Client(object):

    @farine.execute.method()
    def my_own_stuff(self):
        """
        Your own code.
        """

Database

baguette-messaging is using peewee to manage databases connections (only postgresql is supported for the moment)
In order for it to detect that you are using a database, you need to create a models.py module.
Then, each time we enter into a method (amqp,rpc,stream) a database connection will be created and closed.
You can use the database connection using self.db, to manage transactions for example.

Example

models.py:

from farine.connectors.sql import *

class User(Model):
    name = CharField()

service.py:

import farine.amqp
from models import User

class Client(object):

    @farine.amqp.consume(exchange='exchange', routing_key='routing_key')
    def select(self, body, message):
        return User.select().where(User.id==1)

Overview

You can mix in a service, everything:
it can be a consumer of an HTTP stream, and send back the result in RPC, etc.

Example

import farine.rpc
import farine.stream

class Client(object):

        @farine.stream.http()
        def get(self, data):
            return self.send(data)

    @farine.rpc.client('myotherservice')
    def send(self, rpc, data):
        return rpc.process(data)

Configuration

By default the configuration file is located in /etc/farine.ini. You can override this path using the environment variable FARINE_INI.

It must contains one section by service (using the lowercase class name).
a DEFAULT section can also be present.

Example

[DEFAULT]
amqp_uri = amqp://baguette:baguette@127.0.0.1:5672/baguette

[consume]
enabled = true

Database

If you use a database connection, you have to add in the [DEFAULT] section the db parameters:

[DEFAULT]
db_connector = postgres (required)
db_name = name (required)
db_user = user (required)
db_host = host (required)
db_password = password (required)
db_port = port (optional)
db_max_conn = max_connections (optional)
db_stale_timeout = stale timeout (optional)
db_timeout = timeout (optional)

Launch

To launch a service, just run:

farine --start=mymodule

It will try to import mymodule.service and launch it.