Skip to content
Browse files

first cut (not 100% test coverage)

  • Loading branch information...
1 parent 74e00c8 commit 1733cae4dc6bceb15acdcdbd391509e7cb4eed81 @mcdonc mcdonc committed Jan 30, 2011
Showing with 587 additions and 172 deletions.
  1. +232 −98 pyramid_xmlrpc/__init__.py
  2. +355 −74 pyramid_xmlrpc/tests.py
View
330 pyramid_xmlrpc/__init__.py
@@ -1,119 +1,253 @@
+import inspect
import xmlrpclib
-import webob
-
-def xmlrpc_marshal(data):
- """ Marshal a Python data structure into an XML document suitable
- for use as an XML-RPC response and return the document. If
- ``data`` is an ``xmlrpclib.Fault`` instance, it will be marshalled
- into a suitable XML-RPC fault response."""
- if isinstance(data, xmlrpclib.Fault):
- return xmlrpclib.dumps(data)
- else:
- return xmlrpclib.dumps((data,), methodresponse=True)
-
-def xmlrpc_response(data):
- """ Marshal a Python data structure into a webob ``Response``
- object with a body that is an XML document suitable for use as an
- XML-RPC response with a content-type of ``text/xml`` and return
- the response."""
- xml = xmlrpc_marshal(data)
- response = webob.Response(xml)
- response.content_type = 'text/xml'
- response.content_length = len(xml)
- return response
+
+from zope.interface import implements
+from zope.interface import providedBy
+
+from pyramid.interfaces import IViewClassifier
+from pyramid.interfaces import IView
+from pyramid.interfaces import IViewMapperFactory
+
+from pyramid.events import NewRequest
+from pyramid.exceptions import NotFound
+from pyramid.exceptions import Forbidden
+from pyramid.traversal import traverse
+from pyramid.security import has_permission
+from pyramid.view import view_config
+
+# MapplyViewMapper is not an API; it may be moved into another package later
+
+class MapplyViewMapper(object):
+ implements(IViewMapperFactory)
+ def __init__(self, **kw):
+ self.attr = kw.get('attr')
+
+ def __call__(self, view):
+ attr = self.attr
+ if inspect.isclass(view):
+ def _class_view(context, request):
+ params = getattr(request, 'xmlrpc_params', ())
+ keywords = dict(request.params.items())
+ if request.matchdict:
+ keywords.update(request.matchdict)
+ if attr is None:
+ inst = view(request)
+ response = self.mapply(inst, params, keywords)
+ else:
+ inst = view(request)
+ response = self.mapply(getattr(inst, attr), params,
+ keywords)
+ request.__view__ = inst
+ return response
+ mapped_view = _class_view
+ else:
+ def _nonclass_view(context, request):
+ params = (request,) + getattr(request, 'xmlrpc_params', ())
+ keywords = dict(request.params.items())
+ if request.matchdict:
+ keywords.update(request.matchdict)
+ if attr is None:
+ response = self.mapply(view, params, keywords)
+ else:
+ response = self.mapply(getattr(view, attr), params,
+ keywords)
+ return response
+ mapped_view = _nonclass_view
+
+ return mapped_view
+
+ def mapply(self, ob, positional, keyword):
+
+ f = ob
+ im = False
+
+ if hasattr(f, 'im_func'):
+ im = True
+
+ elif not hasattr(f, 'func_defaults'):
+ if hasattr(f, '__call__'):
+ f = f.__call__
+ if hasattr(f, 'im_func'):
+ im = True
+
+ if im:
+ f = f.im_func
+ c = f.func_code
+ defaults = f.func_defaults
+ names = c.co_varnames[1:c.co_argcount]
+ else:
+ defaults = f.func_defaults
+ c = f.func_code
+ names = c.co_varnames[:c.co_argcount]
+
+ nargs = len(names)
+ args = []
+ if positional:
+ positional = list(positional)
+ if len(positional) > nargs:
+ raise TypeError('too many arguments')
+ args = positional
+
+ get = keyword.get
+ nrequired = len(names) - (len(defaults or ()))
+ for index in range(len(args), len(names)):
+ name = names[index]
+ v = get(name, args)
+ if v is args:
+ if index < nrequired:
+ raise TypeError('argument %s was omitted' % name)
+ else:
+ v = defaults[index-nrequired]
+ args.append(v)
+
+ args = tuple(args)
+ return ob(*args)
def parse_xmlrpc_request(request):
""" Deserialize the body of a request from an XML-RPC request
document into a set of params and return a two-tuple. The first
element in the tuple is the method params as a sequence, the
second element in the tuple is the method name."""
if request.content_length > (1 << 23):
- # protect from DOS (> 8MB body)
+ # protect from DOS (> 8MB body), webob will only read CONTENT_LENGTH
+ # bytes when body is accessed, so no worries about getting a
+ # bogus CONTENT_LENGTH header
raise ValueError('Body too large (%s bytes)' % request.content_length)
- params, method = xmlrpclib.loads(request.body)
+ params, method = xmlrpclib.loads(request.body, use_datetime=True)
return params, method
-def xmlrpc_view(wrapped):
- """ This decorator turns functions which accept params and return Python
- structures into functions suitable for use as Pyramid views that speak
- XML-RPC. The decorated function must accept a ``context`` argument and
- zero or more positional arguments (conventionally named ``*params``).
-
- E.g.::
+class xmlrpc_config(view_config):
+ """ This decorator acts almost exactly like
+ :class:`pyramid.view.view_config` but it produces a view configuration
+ which can call an XMLRPC callable rather than a standard Pyramid view
+ callable.
- from pyramid_xmlrpc import xmlrpc_view
+ An XMLRPC callable is one which accepts a variable argument list, which
+ will be populated by items available from the XMLRPC params list,
+ ``request.params`` and ``request.matchdict``. Its first argument must be
+ ``request``, the other arguments will be populated from the available
+ parameters.
- @xmlrpc_view
- def say(context, what):
- if what == 'hello'
- return {'say':'Hello!'}
- else:
- return {'say':'Goodbye!'}
-
- Equates to::
+ E.g.::
- from pyramid_xmlrpc import parse_xmlrpc_request
- from pyramid_xmlrpc import xmlrpc_response
+ from pyramid.xmlrpc import xmlrpc_config
- def say_view(context, request):
- params, method = parse_xmlrpc_request(request)
- return say(context, *params)
+ @xmlrpc_config()
+ def say(request, what):
+ return {'say':what}
+ """
+ def __init__(self, name='', request_type=None, for_=None, permission=None,
+ route_name=None, request_method=None, request_param=None,
+ containment=None, attr=None, wrapper=None, xhr=False,
+ accept=None, header=None, path_info=None, custom_predicates=(),
+ context=None, view_mapper=MapplyViewMapper,
+ renderer='xmlrpc'):
+ self.name = name
+ self.request_type = request_type
+ self.context = context or for_
+ self.permission = permission
+ self.route_name = route_name
+ self.request_method = request_method
+ self.request_param = request_param
+ self.containment = containment
+ self.attr = attr
+ self.wrapper = wrapper
+ self.xhr = xhr
+ self.accept = accept
+ self.header = header
+ self.path_info = path_info
+ self.custom_predicates = custom_predicates
+ self.renderer = renderer
+ self.view_mapper = view_mapper
+ self.custom_predicates = tuple(custom_predicates) + (is_xmlrpc_request,)
+
+def xmlrpc_renderer_factory(info):
+ def _render(value, system):
+ request = system.get('request')
+ if request is not None:
+ if not hasattr(request, 'response_content_type'):
+ request.response_content_type = 'text/xml'
+ if isinstance(value, xmlrpclib.Fault):
+ return xmlrpclib.dumps(value)
+ else:
+ return xmlrpclib.dumps((value,), methodresponse=True)
+ return _render
+
+def xmlrpc_traversal_view(context, request):
+ # duplicate some logic from router to do Zope-style traversal and view
+ # lookup.
+ params, method = request.xmlrpc_params, request.xmlrpc_method
+ names = method.split('.')
+ info = traverse(context, '/'.join(names))
+ inner_context = info['context']
+ view_name = info['view_name']
+ repr_permission = request.registry.settings.get(
+ 'pyramid_xmlrpc.repr_permission', 'view')
+
+ if view_name == '__call__':
+ view_name = ''
+
+ if view_name == '__repr__':
+ def view(context, request):
+ if has_permission(context, request, repr_permission):
+ return repr(context)
+ raise Forbidden('No "%s" permission on context' % repr_permission)
+ else:
+ provides = [IViewClassifier] + map(providedBy,
+ (request, inner_context))
+ reg = request.registry
+ view = reg.adapters.lookup(provides, IView, view_name, default=None)
- def say(context, what):
- if what == 'hello'
- return {'say':'Hello!'}
- else:
- return {'say':'Goodbye!'}
+ if getattr(view, '__original_view__', None) is xmlrpc_traversal_view:
+ view = None
- Note that if you use :class:`~pyramid.view.view_config`, you must
- decorate your view function in the following order for it to be
- recognized by the convention machinery as a view::
+ if view is None:
+ raise NotFound(method)
- from pyramid.view import view_config
- from pyramid_xmlrpc import xmlrpc_view
+ request.__dict__.update(info)
+ return view(inner_context, request)
- @view_config(name='say')
- @xmlrpc_view
- def say(context, what):
- if what == 'hello'
- return {'say':'Hello!'}
- else:
- return {'say':'Goodbye!'}
+def is_xmlrpc_request(context, request):
+ return bool(getattr(request, 'is_xmlrpc', False))
- In other words do *not* decorate it in :func:`~pyramid_xmlrpc.xmlrpc_view`,
- then :class:`~pyramid.view.view_config`; it won't work.
- """
-
- def _curried(context, request):
+def _set_xmlrpc_params(event, override):
+ request = event.request
+ if (request.content_type == 'text/xml'
+ and request.method == 'POST'
+ and not 'soapaction' in request.headers
+ and not 'x-pyramid-avoid-xmlrpc' in request.headers):
params, method = parse_xmlrpc_request(request)
- value = wrapped(context, *params)
- return xmlrpc_response(value)
- _curried.__name__ = wrapped.__name__
- _curried.__grok_module__ = wrapped.__module__
-
- return _curried
-
-class XMLRPCView:
- """A base class for a view that serves multiple methods by XML-RPC.
-
- Subclass and add your methods as described in the documentation.
- """
-
- def __init__(self,context,request):
- self.context = context
- self.request = request
-
- def __call__(self):
- """
- This method de-serializes the XML-RPC request and
- dispatches the resulting method call to the correct
- method on the :class:`~pyramid_xmlrpc.XMLRPCView`
- subclass instance.
-
- .. warning::
- Do not override this method in any subclass if you
- want XML-RPC to continute to work!
-
- """
- params, method = parse_xmlrpc_request(self.request)
- return xmlrpc_response(getattr(self,method)(*params))
+ request.xmlrpc_params, request.xmlrpc_method = params, method
+ request.is_xmlrpc = True
+ if override:
+ request.override_renderer = 'xmlrpc'
+ return True
+ return False
+
+def set_xmlrpc_params_omnipresent(event):
+ return _set_xmlrpc_params(event, override=True)
+
+def set_xmlrpc_params(event):
+ return _set_xmlrpc_params(event, override=False)
+
+def limited(config):
+ """ ``config.include`` target which sets up limited XML-RPC access to an
+ application. Only views configured via ``pyramid_xmlrpc.xmlrpc_config``
+ will be exposed to the world."""
+ config.add_renderer('xmlrpc', xmlrpc_renderer_factory)
+ config.add_subscriber(set_xmlrpc_params, NewRequest)
+
+def omnipresent(config):
+ """ ``config.include`` target which sets up omnipresent (Zope-style)
+ XML-RPC access to an applicaton. Any view configured that uses a
+ renderer will be accessible via XML-RPC. Traversal over the resource
+ tree via xmlrpc will also work."""
+ config.add_renderer('xmlrpc', xmlrpc_renderer_factory)
+ config.add_subscriber(set_xmlrpc_params_omnipresent, NewRequest)
+ config.add_view(
+ xmlrpc_traversal_view,
+ renderer='xmlrpc',
+ custom_predicates=(is_xmlrpc_request,)
+ )
+
+includeme = limited
View
429 pyramid_xmlrpc/tests.py
@@ -1,40 +1,152 @@
import unittest
from pyramid import testing
-class TestXMLRPCMarshal(unittest.TestCase):
- def _callFUT(self, value):
- from pyramid_xmlrpc import xmlrpc_marshal
- return xmlrpc_marshal(value)
-
- def test_xmlrpc_marshal_normal(self):
- data = 1
- marshalled = self._callFUT(data)
- import xmlrpclib
- self.assertEqual(marshalled, xmlrpclib.dumps((data,),
- methodresponse=True))
+class TestMapplyViewMapper(unittest.TestCase):
+ def _makeOne(self, **kw):
+ from pyramid_xmlrpc import MapplyViewMapper
+ return MapplyViewMapper(**kw)
- def test_xmlrpc_marshal_fault(self):
- import xmlrpclib
- fault = xmlrpclib.Fault(1, 'foo')
- data = self._callFUT(fault)
- self.assertEqual(data, xmlrpclib.dumps(fault))
-
-class TestXMLRPResponse(unittest.TestCase):
- def _callFUT(self, value):
- from pyramid_xmlrpc import xmlrpc_response
- return xmlrpc_response(value)
-
- def test_xmlrpc_response(self):
- import xmlrpclib
- data = 1
- response = self._callFUT(data)
- self.assertEqual(response.content_type, 'text/xml')
- self.assertEqual(response.body, xmlrpclib.dumps((1,),
- methodresponse=True))
- self.assertEqual(response.content_length, len(response.body))
- self.assertEqual(response.status, '200 OK')
-
-class TestParseXMLRPCRequest(unittest.TestCase):
+ def test___call__isfunc_no_xmlrpc_params_no_attr(self):
+ data = '123'
+ def view(request):
+ return data
+ context = testing.DummyResource()
+ request = testing.DummyRequest()
+ mapper = self._makeOne()
+ result = mapper(view)(context, request)
+ self.assertEqual(result, data)
+
+ def test___call__isfunc_no_xmlrpc_params_with_attr(self):
+ view = lambda *arg: 'wrong'
+ data = '123'
+ def foo(request):
+ return data
+ view.foo = foo
+ context = testing.DummyResource()
+ request = testing.DummyRequest()
+ mapper = self._makeOne(attr='foo')
+ result = mapper(view)(context, request)
+ self.assertEqual(result, data)
+
+ def test___call__isfunc_with_xmlrpc_params(self):
+ def view(request, a, b):
+ return a, b
+ context = testing.DummyResource()
+ request = testing.DummyRequest()
+ request.xmlrpc_params = ('a', 'b')
+ mapper = self._makeOne()
+ result = mapper(view)(context, request)
+ self.assertEqual(result, ('a', 'b'))
+
+ def test___call__isfunc_with_xmlrpc_params_and_matchdict(self):
+ def view(request, a, b, c=1):
+ return a, b, c
+ context = testing.DummyResource()
+ request = testing.DummyRequest()
+ request.xmlrpc_params = ('a', 'b')
+ request.matchdict = dict(c='2')
+ mapper = self._makeOne()
+ result = mapper(view)(context, request)
+ self.assertEqual(result, ('a', 'b', '2'))
+
+ def test___call__isclass_no_xmlrpc_params_no_attr(self):
+ data = '123'
+ class view(object):
+ def __init__(self, request):
+ pass
+ def __call__(self):
+ return data
+ context = testing.DummyResource()
+ request = testing.DummyRequest()
+ mapper = self._makeOne()
+ result = mapper(view)(context, request)
+ self.assertEqual(result, data)
+ self.assertEqual(request.__view__.__class__, view)
+
+ def test___call__isclass_no_xmlrpc_params_with_attr(self):
+ view = lambda *arg: 'wrong'
+ data = '123'
+ class view(object):
+ def __init__(self, request):
+ pass
+ def index(self):
+ return data
+ context = testing.DummyResource()
+ request = testing.DummyRequest()
+ mapper = self._makeOne(attr='index')
+ result = mapper(view)(context, request)
+ self.assertEqual(result, data)
+ self.assertEqual(request.__view__.__class__, view)
+
+ def test___call__isclass_with_xmlrpc_params(self):
+ class view(object):
+ def __init__(self, request):
+ pass
+ def __call__(self, a, b):
+ return a, b
+ context = testing.DummyResource()
+ request = testing.DummyRequest()
+ request.xmlrpc_params = ('a', 'b')
+ mapper = self._makeOne()
+ result = mapper(view)(context, request)
+ self.assertEqual(result, ('a', 'b'))
+
+ def test___call__isclass_with_xmlrpc_params_and_matchdict(self):
+ class view(object):
+ def __init__(self, request):
+ pass
+ def __call__(self, a, b, c=1):
+ return a, b, c
+ context = testing.DummyResource()
+ request = testing.DummyRequest()
+ request.xmlrpc_params = ('a', 'b')
+ request.matchdict = dict(c='2')
+ mapper = self._makeOne()
+ result = mapper(view)(context, request)
+ self.assertEqual(result, ('a', 'b', '2'))
+
+ def test_mapply_toomanyargs(self):
+ def aview(one, two): pass
+ mapper = self._makeOne()
+ self.assertRaises(TypeError, mapper.mapply, aview, (1, 2, 3), {})
+
+ def test_mapply_all_kwarg_arg_omitted(self):
+ def aview(one, two): pass
+ mapper = self._makeOne()
+ self.assertRaises(TypeError, mapper.mapply, aview, (), dict(one=1))
+
+ def test_mapply_all_kwarg_arg_omitted_with_default(self):
+ def aview(one, two=2):
+ return one, two
+ mapper = self._makeOne()
+ result = mapper.mapply(aview, (), dict(one=1))
+ self.assertEqual(result, (1, 2))
+
+ def test_mapply_all_kwarg(self):
+ def aview(one, two):
+ return one, two
+ mapper = self._makeOne()
+ result = mapper.mapply(aview, (), dict(one=1, two=2))
+ self.assertEqual(result, (1, 2))
+
+ def test_mapply_instmethod(self):
+ mapper = self._makeOne()
+ result = mapper.mapply(self._aview, ('a',), {})
+ self.assertEqual(result, 'a')
+
+ def test_mapply_inst(self):
+ class Foo(object):
+ def __call__(self, a):
+ return a
+ foo = Foo()
+ mapper = self._makeOne()
+ result = mapper.mapply(foo, ('a',), {})
+ self.assertEqual(result, 'a')
+
+ def _aview(self, a):
+ return a
+
+class Test_parse_xmlrpc_request(unittest.TestCase):
def _callFUT(self, request):
from pyramid_xmlrpc import parse_xmlrpc_request
return parse_xmlrpc_request(request)
@@ -55,55 +167,224 @@ def test_toobig(self):
request.content_length = 1 << 24
self.assertRaises(ValueError, self._callFUT, request)
-class TestDecorator(unittest.TestCase):
- def _callFUT(self, unwrapped):
- from pyramid_xmlrpc import xmlrpc_view
- return xmlrpc_view(unwrapped)
+class Test_xmlrpc_config(unittest.TestCase):
+ def _makeOne(self, **kw):
+ from pyramid_xmlrpc import xmlrpc_config
+ return xmlrpc_config(**kw)
- def test_normal(self):
- def unwrapped(context, what):
- return what
- wrapped = self._callFUT(unwrapped)
- self.assertEqual(wrapped.__name__, 'unwrapped')
- self.assertEqual(wrapped.__grok_module__, unwrapped.__module__)
- context = testing.DummyModel()
- request = testing.DummyRequest()
- param = 'what'
+ def test_it_no_custom_predicates(self):
+ inst = self._makeOne()
+ self.assertEqual(len(inst.custom_predicates), 1)
+
+ def test_it_with_custom_predicates(self):
+ inst = self._makeOne(custom_predicates=(1,))
+ self.assertEqual(len(inst.custom_predicates), 2)
+
+class Test_xmlrpc_renderer_factory(unittest.TestCase):
+ def _callFUT(self, info):
+ from pyramid_xmlrpc import xmlrpc_renderer_factory
+ return xmlrpc_renderer_factory(info)
+
+ def test_render_normalvalue(self):
import xmlrpclib
- packet = xmlrpclib.dumps((param,), methodname='__call__')
+ renderer = self._callFUT(None)
request = testing.DummyRequest()
- request.body = packet
- request.content_length = len(packet)
- response = wrapped(context, request)
- self.assertEqual(response.body, xmlrpclib.dumps((param,),
- methodresponse=True))
+ result = renderer('one', dict(request=request))
+ self.assertEqual(result, xmlrpclib.dumps(('one',),
+ methodresponse=True))
-class TestBaseClass(unittest.TestCase):
+ def test_render_Fault(self):
+ import xmlrpclib
+ renderer = self._callFUT(None)
+ request = testing.DummyRequest()
+ fault = xmlrpclib.Fault('1', '1')
+ result = renderer(fault, dict(request=request))
+ self.assertEqual(result, xmlrpclib.dumps(fault))
- def test_normal(self):
+class Test_xmlrpc_traversal_view(unittest.TestCase):
+ def setUp(self):
+ self.config = testing.setUp()
+
+ def tearDown(self):
+ testing.tearDown()
+
+ def _callFUT(self, context, request):
+ from pyramid_xmlrpc import xmlrpc_traversal_view
+ return xmlrpc_traversal_view(context, request)
+
+ def test_no_permission(self):
+ from pyramid.exceptions import Forbidden
+ def theview(request): pass
+ self.config.testing_securitypolicy('fred', permissive=False)
+ self.config.add_view(theview, name='foo', permission='view')
+ request = testing.DummyRequest()
+ request.xmlrpc_params = ()
+ request.xmlrpc_method = 'foo'
+ context = testing.DummyResource()
+ self.assertRaises(Forbidden, self._callFUT, context, request)
+
+ def test_has_permission(self):
+ def theview(request):
+ return '123'
+ self.config.testing_securitypolicy('fred', permissive=True)
+ self.config.add_view(theview, name='foo', permission='view')
+ request = testing.DummyRequest()
+ request.xmlrpc_params = ()
+ request.xmlrpc_method = 'foo'
+ context = testing.DummyResource()
+ result = self._callFUT(context, request)
+ self.assertEqual(result, '123')
+
+ def test_unprotected(self):
+ def theview(request):
+ return '123'
+ self.config.testing_securitypolicy('fred', permissive=False)
+ self.config.add_view(theview, name='foo')
+ request = testing.DummyRequest()
+ request.xmlrpc_params = ()
+ request.xmlrpc_method = 'foo'
+ context = testing.DummyResource()
+ result = self._callFUT(context, request)
+ self.assertEqual(result, '123')
+ self.assertEqual(request.view_name, 'foo')
- from pyramid_xmlrpc import XMLRPCView
- class Test(XMLRPCView):
- def a_method(self,param):
- return param
+ def test_recursion_protection(self):
+ from pyramid.exceptions import NotFound
+ from pyramid_xmlrpc import xmlrpc_traversal_view
+ self.config.testing_securitypolicy('fred', permissive=True)
+ self.config.add_view(xmlrpc_traversal_view, name='foo',
+ permission='view')
+ request = testing.DummyRequest()
+ request.xmlrpc_params = ()
+ request.xmlrpc_method = 'foo'
+ context = testing.DummyResource()
+ self.assertRaises(NotFound, self._callFUT, context, request)
- # set up a request
- param = 'what'
- import xmlrpclib
- packet = xmlrpclib.dumps((param,), methodname='a_method')
+ def test_view_name_is___repr__security_pass(self):
request = testing.DummyRequest()
- request.body = packet
- request.content_length = len(packet)
+ request.xmlrpc_params = ()
+ request.xmlrpc_method = '__repr__'
+ context = testing.DummyResource()
+ expected = repr(context)
+ result = self._callFUT(context, request)
+ self.assertEqual(result, expected)
+
+ def test_view_name_is___repr__security_fail(self):
+ from pyramid.exceptions import Forbidden
+ request = testing.DummyRequest()
+ request.xmlrpc_params = ()
+ request.xmlrpc_method = '__repr__'
+ self.config.testing_securitypolicy(permissive=False)
+ request.registry.settings['pyramid_xmlrpc.repr_permission'] = 'view'
+ context = testing.DummyResource()
+ self.assertRaises(Forbidden, self._callFUT, context, request)
- # instantiate the view
- context = testing.DummyModel()
- instance = Test(context,request)
+ def test_view_name_is___call__(self):
+ def theview(request):
+ return '123'
+ self.config.add_view(theview)
+ request = testing.DummyRequest()
+ request.xmlrpc_params = ()
+ request.xmlrpc_method = '__call__'
+ context = testing.DummyResource()
+ result = self._callFUT(context, request)
+ self.assertEqual(result, '123')
+
+class Test_is_xmlrpc_request(unittest.TestCase):
+ def _callFUT(self, context, request):
+ from pyramid_xmlrpc import is_xmlrpc_request
+ return is_xmlrpc_request(context, request)
+
+ def test_true(self):
+ request = testing.DummyRequest()
+ request.is_xmlrpc = True
+ self.assertEqual(self._callFUT(None, request), True)
- # these are fair game for the methods to use if they want
- self.failUnless(instance.context is context)
- self.failUnless(instance.request is request)
+ def test_false(self):
+ request = testing.DummyRequest()
+ request.is_xmlrpc = False
+ self.assertEqual(self._callFUT(None, request), False)
+
+ def test_missing_false(self):
+ request = testing.DummyRequest()
+ self.assertEqual(self._callFUT(None, request), False)
+
+class Test__set_xmlrpc_params(unittest.TestCase):
+ def _callFUT(self, event, override):
+ from pyramid_xmlrpc import _set_xmlrpc_params
+ return _set_xmlrpc_params(event, override)
+
+ def test_false_not_text_xml(self):
+ request = testing.DummyRequest()
+ request.content_type = 'not/textxml'
+ request.method = 'POST'
+ request.headers = {}
+ event = DummyEvent()
+ event.request = request
+ result = self._callFUT(event, False)
+ self.assertEqual(result, False)
+
+ def test_false_not_post(self):
+ request = testing.DummyRequest()
+ request.content_type = 'text/xml'
+ request.method = 'GET'
+ request.headers = {}
+ event = DummyEvent()
+ event.request = request
+ result = self._callFUT(event, False)
+ self.assertEqual(result, False)
+
+ def test_false_soapaction(self):
+ request = testing.DummyRequest()
+ request.content_type = 'text/xml'
+ request.method = 'POST'
+ request.headers = {'soapaction':'true'}
+ event = DummyEvent()
+ event.request = request
+ result = self._callFUT(event, False)
+ self.assertEqual(result, False)
+
+ def test_false_avoid_xmlrpc(self):
+ request = testing.DummyRequest()
+ request.content_type = 'text/xml'
+ request.method = 'POST'
+ request.headers = {'x-pyramid-avoid-xmlrpc':'true'}
+ event = DummyEvent()
+ event.request = request
+ result = self._callFUT(event, False)
+ self.assertEqual(result, False)
+
+ def test_true_no_override(self):
+ import xmlrpclib
+ request = testing.DummyRequest()
+ request.content_type = 'text/xml'
+ request.method = 'POST'
+ request.headers = {}
+ request.body = xmlrpclib.dumps(('a',))
+ event = DummyEvent()
+ event.request = request
+ result = self._callFUT(event, False)
+ self.assertEqual(result, True)
+ self.assertEqual(request.is_xmlrpc, True)
+ self.assertEqual(request.xmlrpc_params, ('a',))
+ self.assertEqual(request.xmlrpc_method, None)
+ self.assertEqual(getattr(request, 'override_renderer', None), None)
+
+ def test_true_with_override(self):
+ import xmlrpclib
+ request = testing.DummyRequest()
+ request.content_type = 'text/xml'
+ request.method = 'POST'
+ request.headers = {}
+ request.body = xmlrpclib.dumps(('a',))
+ event = DummyEvent()
+ event.request = request
+ result = self._callFUT(event, True)
+ self.assertEqual(result, True)
+ self.assertEqual(request.is_xmlrpc, True)
+ self.assertEqual(request.xmlrpc_params, ('a',))
+ self.assertEqual(request.xmlrpc_method, None)
+ self.assertEqual(request.override_renderer, 'xmlrpc')
- # exercise it
- response = instance()
- self.assertEqual(response.body, xmlrpclib.dumps((param,),
- methodresponse=True))
+class DummyEvent(object):
+ pass

0 comments on commit 1733cae

Please sign in to comment.
Something went wrong with that request. Please try again.