Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

kombu.exceptions.ContentDisallowed: Refusing to deserialize disabled content of type pickle (application/x-python-serialize) #32

Closed
femtotrader opened this issue Jun 27, 2014 · 15 comments
Milestone

Comments

@femtotrader
Copy link
Contributor

Hello,

Kombu seems to disallow pickled objects.
I think they did it for security reason (should we care?)

How can I enable it ?

Is there a way to urge callme to use JSON RPC instead ?

Kind regards

@skudriashev
Copy link
Collaborator

@femtotrader, what callme version do you use?

@femtotrader
Copy link
Contributor Author

@skudriashev Sorry that wasn't github version... I git clone and python setup.py install
It works fine. Any comment about JSON RPC ?

@skudriashev
Copy link
Collaborator

@femtotrader, we discussed this with @ceelian several times. First question - why do you really need json, why not to use pickle? Is there any cases that would require json. Second, how would you work with exceptions from remote end. AFAIK, they are currently pickled and delivered to the client side.

@ceelian, can give more detail on that.

@femtotrader
Copy link
Contributor Author

@ceelian @skudriashev It's a quite complicated use case. My Python script need to send JSON RPC to a database where an other software (not Python) is able to execute it. I just want to use messaging to be able to send JSON RPC in any language. My first idea was to do it myself so I did some RPC example with Pika (RabbitMQ) and I found after doing this callme which seems to be interesting (and probably much more professional than my ugly code - see below). About exception... JSON RPC provide error code -32600. We can also use an other error code and use traceback to output exception message to JSON RPC response

json_rpc_amqp_server.py

import logging
import logging.config
import traceback
import click
import pyjsonrpc
import decimal
from json_rpc_amqp import JsonRpcAmqpServer, JSON_RPC_Encoder_Decoder

def add(a, b):
    """Test function"""
    #a = decimal.Decimal(a)
    #b = decimal.Decimal(b)
    return a + b

def fib(n):
    """Fibonacci function"""
    #n = int(n)
    #n = decimal.Decimal(n)
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

@click.command()
@click.option('--host', default='localhost', help='host')
@click.option('--username', default='guest', help='loging')
@click.option('--password', default='guest', help='password')
@click.option('--queue', default='rpc_queue', help='queue')
@click.option('--purge/--no-purge', default=True)
def main(host, username, password, queue, purge):
    enc = JSON_RPC_Encoder_Decoder()
    json_rpc_server = JsonRpcAmqpServer(host, username, password, queue, purge, enc)
    json_rpc_server.register_functions(
        {
            "add": add,
            "fib": fib
        })
    json_rpc_server.start()

if __name__ == '__main__':
    logging.config.fileConfig("logging.conf")    
    logger = logging.getLogger("simpleExample")
    main()

json_rpc_amqp_client.py

import logging
import logging.config
import traceback
import click
import json
import datetime
from json_rpc_amqp import JsonRpcAmqpClient, JSON_RPC_Encoder_Decoder

@click.command()
@click.option('--host', default='localhost', help='host')
@click.option('--username', default='guest', help='loging')
@click.option('--password', default='guest', help='password')
@click.option('--queue', default='rpc_queue', help='queue')
@click.option('--a', default=10, help='a')
@click.option('--b', default=15, help='b')
@click.option('--method', default='fib', help='method')
@click.option('--timeout', default=2, help='timeout')
def main(host, username, password, queue, timeout, method, a, b):
    enc = JSON_RPC_Encoder_Decoder()
    json_rpc_client = JsonRpcAmqpClient(host, username, password, queue, timeout, enc)

    if method=='fib':
        response = json_rpc_client.call("fib", a)
        logging.info("fib(%d)=%d" % (a, response))
        logging.info(response)
        logging.info(type(response))

    elif method=='fiberrarg':
        response = json_rpc_client.call("fib", a, 10)

    elif method=='fiberrmeth':
        response = json_rpc_client.call("fibx", a, 10)

    elif method=='add':
        response = json_rpc_client.call("add", a, b)
        logging.info("a+b=%d+%d=%d" % (a, b, response))
        logging.info(response)
        logging.info(type(response))

    elif method=='adddt':
        response = json_rpc_client.call("add", datetime.datetime.utcnow(), datetime.timedelta(days=1))

    elif method=='notjson':
        import pika
        credentials = pika.PlainCredentials(username, password)
        parameters = pika.ConnectionParameters(host=host, credentials=credentials)
        connection = pika.BlockingConnection(parameters)
        channel = connection.channel()
        channel.basic_publish(exchange='', routing_key=queue, body="bad_request")

    else:
        response = json_rpc_client.call("fibx", a)

if __name__ == '__main__':
    logging.config.fileConfig("logging.conf")    
    logger = logging.getLogger("simpleExample")
    main()

json_rpc_amqp.py

import pika
import uuid
import logging
import traceback

import pyjsonrpc
import pyjsonrpc.rpcerror as rpcerror
import pyjsonrpc.rpcresponse as rpcresponse

import datetime
import json
from bson import json_util
#import anyjson as json

from abc import ABCMeta, abstractmethod

class MyException(Exception):
    pass

class RPC_Encoder_Decoder(object):
    __metaclass__ = ABCMeta

    def __init__(self):
        self.d_funcs = None

    def functions_registered(self):
        return(self.d_funcs is not None)

    @abstractmethod
    def encode(self, data): pass

    @abstractmethod
    def _decode(self, data): pass

    @abstractmethod
    def decode(self, data): pass

    @abstractmethod
    def register_functions(self, d_funcs): pass

    @abstractmethod
    def call(self, rpc_request): pass

class JSON_RPC_Encoder_Decoder(RPC_Encoder_Decoder):
    def encode(self, data):
        return(json.dumps(data, default=json_util.default))

    def _decode(self, data):
        #return(json.loads(data, default=json_default))
        return(json.loads(data, object_hook=json_util.object_hook))

    def decode(self, data):
        data = self._decode(data)
        if "result" in data:
            return(data["result"])
        else:
            raise(MyException(rpcerror.jsonrpcerrors[data["error"]["code"]]))
            #try:
            #    raise(rpcerror.jsonrpcerrors[data["error"]["code"]])
            #except:
            #    logging.error(traceback.format_exc())

    def register_functions(self, d_funcs):
        self.d_funcs = d_funcs
        self.rpc = pyjsonrpc.JsonRpc(methods = d_funcs)

    def call(self, rpc_request):
        rpc_response = self.rpc.call(rpc_request)
        return(rpc_response)


class JsonRpcAmqpClient(object):
    def __init__(self, host, username, password, queue, timeout, enc_decoder=None):
        if enc_decoder is None:
            self.enc_decoder = JSON_RPC_Encoder_Decoder()
        else:
            self.enc_decoder = enc_decoder

        credentials = pika.PlainCredentials(username, password)
        parameters = pika.ConnectionParameters(host=host, credentials=credentials)
        self.connection = pika.BlockingConnection(parameters)
        if timeout>0:
            self.connection.add_timeout(timeout, self.on_response)

        self.channel = self.connection.channel()

        self.queue = queue

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        logging.debug("queue: %r" % queue)

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, channel=None, method=None, properties=None, body=None):
        if channel is None or method is None or properties is None or body is None:
            logging.error("Deadline (timeout)")
            self.response = None
            self.connection.close()
            #raise(NotImplementedError)
        else:
            if self.correlation_id == properties.correlation_id:
                self.response = body
                self.dt_response = datetime.datetime.utcnow()

    def call(self, method, *args, **kwargs):
        rpc_request = pyjsonrpc.create_request_dict(method, *args, **kwargs)
        self.dt_call = datetime.datetime.utcnow()
        self.response = None
        #self.correlation_id = str(uuid.uuid4()) # UUID4 (random)
        self.correlation_id = rpc_request["id"] # get request_id from dict
        rpc_request = self.enc_decoder.encode(rpc_request) # dict -> str
        #rpc_request = self.enc_decoder.encode(rpc_request) + "bad" # badly formated JSON (for test)

        logging.debug(" [->] Sending request to queue %r" % self.queue)
        logging.debug("request: %r" % rpc_request)
        logging.debug("correlation_id: %r" % self.correlation_id)
        logging.debug("reply_to: %r" % self.callback_queue)
        self.channel.basic_publish(exchange='',
                                   routing_key=self.queue,
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.correlation_id,
                                         ),
                                   body=rpc_request)
        while self.response is None:
            self.connection.process_data_events()
        logging.debug(" [<-] Got response on queue %r" % self.callback_queue)
        logging.debug("response: %r" % (self.response))
        logging.debug("rpc execution delay: %s" % (self.dt_response-self.dt_call))

        return(self.enc_decoder.decode(self.response))

class JsonRpcAmqpServer(object):
    def __init__(self, host, username, password, queue, purge_at_startup, enc_decoder=None):
        if enc_decoder is None:
            self.enc_decoder = JSON_RPC_Encoder_Decoder()
        else:
            self.enc_decoder = enc_decoder

        credentials = pika.PlainCredentials(username, password)
        parameters = pika.ConnectionParameters(host=host, credentials=credentials)
        self.connection = pika.BlockingConnection(parameters)

        self.channel = self.connection.channel()

        self.channel.queue_declare(queue=queue)

        if purge_at_startup:
            self.channel.queue_purge(queue) # purge queue before starting server

        self.channel.basic_qos(prefetch_count=1)

        logging.debug("Awaiting RPC requests on queue %r" % queue)

        self.channel.basic_consume(self.on_request, queue=queue)

        self.rpc = None

    def start(self):
        if self.enc_decoder.functions_registered():
            self.channel.start_consuming()
        else:
            raise(MyException("no RPC function registered (use register_functions)"))

    def register_functions(self, d_funcs):
        self.enc_decoder.register_functions(d_funcs)

    def on_request(self, channel, method, properties, rpc_request):
        logging.debug(" [->] Receiving request")
        logging.debug("request: %r" % (rpc_request,))

        try:
            #rpc_response = self.rpc.call(rpc_request)
            rpc_response = self.enc_decoder.call(rpc_request)
        except rpcerror.ParseError as e:
            logging.error("Can't call %r" % rpc_request)
            logging.error(traceback.format_exc())
            #logging.error(e)

            #rpc_response = rpcresponse.Response(
            #    jsonrpc = jsonrpc,
            #    id = None,
            #    error = rpcerror.ParseError()
            #)

            #print(rpc_response)

            #ToFix
            #rpc_response = rpcerror.ParseError().toJSON()
            #print(type(rpc_response))
            return()

        logging.debug(" [<-] Sending response")
        logging.debug("response: %r" % (rpc_response,))
        logging.debug("correlation_id: %r" % properties.correlation_id)
        logging.debug("reply_to: %r" % properties.reply_to)

        channel.basic_publish(exchange='',
                     routing_key=properties.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                     properties.correlation_id),
                     body=rpc_response)
        channel.basic_ack(delivery_tag = method.delivery_tag)

@skudriashev
Copy link
Collaborator

@femtotrader, does your implementation work as intended?

@femtotrader
Copy link
Contributor Author

Yes it works nearly as expected (timeout on server side is still missing but I can live without this feature for now) but I would prefer callme has a setting to choose serializer / deserializer (and so being able to define JSON).

@skudriashev
Copy link
Collaborator

@femtotrader, supporting json-serializer means handling exceptions in different ways for different serializers, that I don't really like.

@femtotrader
Copy link
Contributor Author

Can't we have an abstract class (with same interface as my RPC_Encoder_Decoder) and have several encoder/decoder Pickle_RPC_Encoder_Decoder, JSON_RPC_Encoder_Decoder, XML_RPC_Encoder_Decoder, BSON_RPC_Encoder_Decoder, MessagePack_RPC_Encoder_Decoder, ProtocolBuffers_RPC_Encoder_Decoder... (to just named some ideas). Supporting only 2 (Pickle and JSON RPC) could be a first start.

@skudriashev
Copy link
Collaborator

@femtotrader, I mean that currently you can do:

proxy = Proxy(...)
try:
    proxy.add()
except:
    ...

You are able to 'catch' remote exception locally. How it would be with json?

@femtotrader
Copy link
Contributor Author

Exception is raised in my JSON_RPC_Encoder_Decoder.decode method. So we can catch exception locally even if it's JSON, XML, Pickle, or whatever (but maybe I don't understand what you are saying) (see updated code with abstract class)

@skudriashev
Copy link
Collaborator

@femtotrader, in your code you raise MyException, not original exception.

@skudriashev
Copy link
Collaborator

@femtotrader, any updates on this? :)

@femtotrader
Copy link
Contributor Author

raise(rpcerror.jsonrpcerrors[data["error"]["code"]]) was my solution.
This project http://spyne.io/ seems interesting... but it doesn't support AMQP (neither directly through Pika... neither through Kombu)

see arskom/spyne#374

@skudriashev
Copy link
Collaborator

@femtotrader, spyne.io might be what you need, since raising error code is not the same like rising exception object from remote side.

Since this issue header is not related to what we discussed here I'm cosing it.

@femtotrader
Copy link
Contributor Author

it's raising exception (inherited from RuntimeError) see https://github.com/gerold-penz/python-jsonrpc/blob/master/pyjsonrpc/rpcerror.py

@ceelian ceelian added this to the v0.2 milestone Jul 12, 2014
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants