New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow AMQP extensions to be passed when connecting to RMQ broker. #46

Closed
wants to merge 3 commits into
base: master
from
Jump to file or symbol
Failed to load files and symbols.
+65 −13
Diff settings

Always

Just for now

@@ -38,6 +38,9 @@ AMQTable_SetArrayValue(amqp_table_t*, amqp_bytes_t, amqp_array_t);
_PYRMQ_INLINE void
AMQTable_SetStringValue(amqp_table_t*, amqp_bytes_t, amqp_bytes_t);
_PYRMQ_INLINE void
AMQTable_SetBoolValue(amqp_table_t*, amqp_bytes_t, int);
_PYRMQ_INLINE void
AMQTable_SetIntValue(amqp_table_t *, amqp_bytes_t, int);
@@ -160,6 +163,15 @@ AMQTable_SetStringValue(amqp_table_t *table,
entry->value.value.bytes = value;
}
_PYRMQ_INLINE void
AMQTable_SetBoolValue(amqp_table_t *table,
amqp_bytes_t key, int value)
{
amqp_table_entry_t *entry = AMQTable_AddEntry(table, key);
entry->value.kind = AMQP_FIELD_KIND_BOOLEAN;
entry->value.value.boolean = value;
}
_PYRMQ_INLINE void
AMQTable_SetIntValue(amqp_table_t *table,
amqp_bytes_t key, int value)
@@ -266,20 +278,35 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool
PyString_AS_AMQBYTES(dkey),
PyIter_ToAMQArray(conn, dvalue, pool));
}
else if (PyBool_Check(dvalue)) {
/* Bool */
clong_value = 0; /* default false */
if (dvalue == Py_True)
clong_value = 1;
AMQTable_SetBoolValue(&dst,
PyString_AS_AMQBYTES(dkey),
clong_value);
}
else if (PyLong_Check(dvalue) || PyInt_Check(dvalue)) {
/* Int | Long */
clong_value = (int64_t)PyLong_AsLong(dvalue);
if (PyErr_Occurred())
goto error;
if (clong_value == -1)
goto error;
AMQTable_SetIntValue(&dst,
PyString_AS_AMQBYTES(dkey),
clong_value
);
}
else if (PyFloat_Check(dvalue)) {
cdouble_value = PyFloat_AsDouble(dvalue);
if (PyErr_Occurred())
goto error;
if (cdouble_value == -1)
goto error;
AMQTable_SetDoubleValue(&dst,
PyString_AS_AMQBYTES(dkey),
cdouble_value
@@ -984,11 +1011,18 @@ PyRabbitMQ_Connection_fileno(PyRabbitMQ_Connection *self)
* Connection.connect()
*/
static PyObject*
PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self, PyObject *args)
{
int status;
amqp_socket_t *socket = NULL;
amqp_rpc_reply_t reply;
PyObject *client_properties;
amqp_pool_t pool;
amqp_table_t properties;
if(!PyArg_ParseTuple(args, "|O", &client_properties)) {
goto bail;
}
if (self->connected) {
PyErr_SetString(PyRabbitMQExc_ConnectionError, "Already connected");
@@ -1012,12 +1046,25 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
Py_BEGIN_ALLOW_THREADS;
self->sockfd = amqp_socket_get_sockfd(socket);
reply = amqp_login(self->conn, self->virtual_host, self->channel_max,
self->frame_max, self->heartbeat,
AMQP_SASL_METHOD_PLAIN, self->userid, self->password);
if (PyDict_Check(client_properties)) {
init_amqp_pool(&pool, self->frame_max);
properties = PyDict_ToAMQTable(self->conn, client_properties, &pool);
reply = amqp_login_with_properties(self->conn, self->virtual_host, self->channel_max,
self->frame_max, self->heartbeat,
&properties,
AMQP_SASL_METHOD_PLAIN, self->userid, self->password);
} else {
reply = amqp_login(self->conn, self->virtual_host, self->channel_max,
self->frame_max, self->heartbeat,
AMQP_SASL_METHOD_PLAIN, self->userid, self->password);
}
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't log in"))
goto bail;
goto bail;
/* after tune */
self->connected = 1;
@@ -165,7 +165,7 @@ static PyObject*
PyRabbitMQ_Connection_fileno(PyRabbitMQ_Connection *);
static PyObject*
PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *);
PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *, PyObject *);
static PyObject*
PyRabbitMQ_Connection_close(PyRabbitMQ_Connection *);
@@ -263,7 +263,7 @@ static PyMethodDef PyRabbitMQ_ConnectionType_methods[] = {
{"fileno", (PyCFunction)PyRabbitMQ_Connection_fileno,
METH_NOARGS, "File descriptor number."},
{"connect", (PyCFunction)PyRabbitMQ_Connection_connect,
METH_NOARGS, "Establish connection to the broker."},
METH_VARARGS, "Establish connection to the broker."},
{"_close", (PyCFunction)PyRabbitMQ_Connection_close,
METH_NOARGS, "Close connection."},
{"_channel_open", (PyCFunction)PyRabbitMQ_Connection_channel_open,
View
@@ -173,9 +173,14 @@ class Connection(_librabbitmq.Connection):
def __init__(self, host='localhost', userid='guest', password='guest',
virtual_host='/', port=5672, channel_max=0xffff,
frame_max=131072, heartbeat=0, lazy=False, **kwargs):
frame_max=131072, heartbeat=0, lazy=False, capabilities=None, **kwargs):
if ':' in host:
host, port = host.split(':')
client_properties = None
if capabilities:
client_properties = {'capabilities': capabilities}
super(Connection, self).__init__(hostname=host, port=int(port),
userid=userid, password=password,
virtual_host=virtual_host,
@@ -185,7 +190,7 @@ def __init__(self, host='localhost', userid='guest', password='guest',
self.channels = {}
self._avail_channel_ids = array('H', xrange(self.channel_max, 0, -1))
if not lazy:
self.connect()
self.connect(client_properties)
def __enter__(self):
return self