Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

html updates #139

Merged
merged 13 commits into from
This page is out of date. Refresh to see the latest.
3  .gitignore
View
@@ -13,6 +13,9 @@ MANIFEST
dist
build
+# 2to3 cruft
+*.bak
+
# Mac cruft
.DS_Store
2  CHANGELOG.rst
View
@@ -31,6 +31,8 @@ rpclib-2.8.0-beta
* https://github.com/arskom/rpclib/pull/129
* https://github.com/arskom/rpclib/pull/133
* https://github.com/arskom/rpclib/pull/137
+ * https://github.com/arskom/rpclib/pull/138
+ * https://github.com/arskom/rpclib/pull/139
rpclib-2.7.0-beta
-----------------
4 src/rpclib/_base.py
View
@@ -198,6 +198,10 @@ def __init__(self, transport):
self.function = None
"""The callable of the user code."""
+ self.locale = None
+ """The locale the request will use when needed for things like date
+ formatting, html rendering and such."""
+
self.frozen = True
"""When this is set, no new attribute can be added to this class
instance. This is mostly for internal use.
9 src/rpclib/model/_base.py
View
@@ -19,6 +19,7 @@
import rpclib.const.xml_ns
+
"""This module contains the ModelBase class and other building blocks for
defining models.
"""
@@ -34,6 +35,7 @@ def wrapper(cls, element):
return func(cls, element)
return wrapper
+
def nillable_string(func):
"""Decorator that retuns None if input is None."""
@@ -44,6 +46,7 @@ def wrapper(cls, string):
return func(cls, string)
return wrapper
+
def nillable_iterable(func):
"""Decorator that retuns [] if input is None."""
@@ -93,6 +96,12 @@ class Attributes(object):
"""The tag used to add a primitives as child to a complex type in the
xml schema."""
+ translations = {}
+ """A dict that contains locale codes keys and translations of field
+ names to human language as a basestring child as values.
+ """
+
+
class Annotations(object):
"""The class that holds the annotations for the given type."""
9 src/rpclib/model/complex.py
View
@@ -28,10 +28,12 @@
from rpclib.model import nillable_dict
from rpclib.model import nillable_string
-from rpclib.util.odict import odict as TypeInfo
+from rpclib.util.odict import odict
from rpclib.const import xml_ns as namespace
from rpclib.const.suffix import TYPE_SUFFIX
+class TypeInfo(odict):
+ pass
class _SimpleTypeInfoElement(object):
__slots__ = ['path', 'parent', 'type']
@@ -210,6 +212,7 @@ def get_serialization_instance(cls, value):
* A dict of native types
* The native type itself.
"""
+
# if the instance is a list, convert it to a cls instance.
# this is only useful when deserializing method arguments for a client
# request which is the only time when the member order is not arbitrary
@@ -276,7 +279,7 @@ def get_flat_type_info(cls, retval=None):
"""
if retval is None:
- retval = {}
+ retval = TypeInfo()
parent = getattr(cls, '__extends__', None)
if parent != None:
@@ -304,7 +307,7 @@ def get_simple_type_info(cls, hier_delim="_", retval=None, prefix=None,
"""
if retval is None:
- retval = {}
+ retval = TypeInfo()
if prefix is None:
prefix = []
9 src/rpclib/model/primitive.py
View
@@ -214,10 +214,14 @@ def from_string(cls, value):
if sys.version > '3':
String = Unicode
-class AnyUri(Unicode):
- """This is an xml schema type with is a special kind of String."""
+# FIXME: Support this for soft validation
+class AnyUri(String):
+ """A special kind of String type designed to hold an uri."""
__type_name__ = 'anyURI'
+class ImageUri(AnyUri):
+ """A special kind of String that holds an uri of an image."""
+
class Decimal(SimpleModel):
"""The primitive that corresponds to the native python Decimal.
@@ -320,7 +324,6 @@ class Integer(Decimal):
@classmethod
@nillable_string
def to_string(cls, value):
- assert (cls.__length__ is None) or (0 <= value < 2** cls.__length__)
int(value) # sanity check
return str(value)
370 src/rpclib/protocol/html.py
View
@@ -18,7 +18,9 @@
#
"""This module contains the EXPERIMENTAL Html protocol implementation.
-It seeks to eliminate the need for templates.
+It seeks to eliminate the need for html templates.
+
+As you can tell, I haven't figured it all out yet :)
"""
import logging
@@ -33,9 +35,17 @@
from rpclib.model.binary import ByteArray
from rpclib.model.binary import Attachment
from rpclib.model.complex import ComplexModelBase
+from rpclib.model.primitive import ImageUri
from rpclib.protocol import ProtocolBase
from rpclib.util.cdict import cdict
+def translate(cls, locale, default):
+ print locale, cls, cls.Attributes.translations
+ retval = cls.Attributes.translations.get(locale, None)
+ if retval is None:
+ return default
+ return retval
+
def serialize_null(prot, cls, name):
return [ E(prot.child_tag, **{prot.field_name_attr: name}) ]
@@ -88,15 +98,13 @@ def serialize(self, ctx, message):
ctx.out_body_doc, ctx.out_header_doc and ctx.out_document.
"""
- assert message in (self.RESPONSE,)
+ assert message in (self.RESPONSE, )
self.event_manager.fire_event('before_serialize', ctx)
if ctx.out_error is not None:
# FIXME: There's no way to alter soap response headers for the user.
- ctx.out_document = self.serialize_complex_model(
- ctx.out_error.__class__, ctx.out_error,
- ctx.out_error.get_type_name())
+ ctx.out_document = [ctx.out_error.to_string(ctx.out_error)]
else:
# instantiate the result message
@@ -112,7 +120,7 @@ def serialize(self, ctx, message):
ctx.out_header_doc = None
ctx.out_body_doc = self.serialize_impl(result_message_class,
- result_message)
+ result_message, ctx.locale)
ctx.out_document = ctx.out_body_doc
@@ -143,7 +151,7 @@ class HtmlMicroFormat(HtmlBase):
mime_type = 'text/html'
def __init__(self, app=None, validator=None, root_tag='div',
- child_tag='div', field_name_attr='class', skip_depth=0):
+ child_tag='div', field_name_attr='class', skip_depth=0):
"""Protocol that returns the response object as a html microformat. See
https://en.wikipedia.org/wiki/Microformats for more info.
@@ -164,9 +172,9 @@ def __init__(self, app=None, validator=None, root_tag='div',
HtmlBase.__init__(self, app, validator, skip_depth)
- assert root_tag in ('div','span')
- assert child_tag in ('div','span')
- assert field_name_attr in ('class','id')
+ assert root_tag in ('div', 'span')
+ assert child_tag in ('div', 'span')
+ assert field_name_attr in ('class', 'id')
self.__root_tag = root_tag
self.__child_tag = child_tag
@@ -192,11 +200,12 @@ def field_name_attr(self):
return self.__field_name_attr
@nillable_value
- def serialize_model_base(self, cls, value, name='retval'):
- return [ E(self.child_tag, cls.to_string(value), **{self.field_name_attr: name}) ]
+ def serialize_model_base(self, cls, value, locale, name='retval'):
+ return [ E(self.child_tag, cls.to_string(value),
+ **{self.field_name_attr: name}) ]
- def serialize_impl(self, cls, value):
- return self.serialize_complex_model(cls, value, cls.get_type_name())
+ def serialize_impl(self, cls, value, locale):
+ return self.serialize_complex_model(cls, value, cls.get_type_name(), locale)
@nillable_value
def serialize_complex_model(self, cls, value, name='retval'):
@@ -214,46 +223,83 @@ def serialize_complex_model(self, cls, value, name='retval'):
yield '</%s>' % self.root_tag
-class HtmlTable(HtmlBase):
+def HtmlTable(app=None, validator=None, produce_header=True,
+ table_name_attr='class', field_name_attr=None, border=0,
+ fields_as='columns', row_class=None, cell_class=None,
+ header_cell_class=None):
+ """Protocol that returns the response object as a html table.
+
+ The simple flavour is like the HtmlMicroFormatprotocol, but returns data
+ as a html table using the <table> tag.
+
+ :param app: A rpclib.application.Application instance.
+ :param validator: The validator to use. Ignored.
+ :param produce_header: Boolean value to determine whether to show field
+ names in the beginning of the table or not. Defaults to True. Set to
+ False to skip headers.
+ :param table_name_attr: The name of the attribute that will contain the
+ response name of the complex object in the table tag. Set to None to
+ disable.
+ :param field_name_attr: The name of the attribute that will contain the
+ field names of the complex object children for every table cell. Set
+ to None to disable.
+ :param fields_as: One of 'columns', 'rows'.
+ :param row_cell_class: value that goes inside the <tr class="">
+ :param cell_cell_class: value that goes inside the <td class="">
+ :param header_cell_class: value that goes inside the <th class="">
+
+ "Fields as rows" returns one record per table in a table with two
+ columns.
+
+ "Fields as columns" returns one record per table row in a table that
+ has as many columns as field names, just like a regular spreadsheet.
+ """
+
+ if fields_as == 'columns':
+ return _HtmlColumnTable(app, validator, produce_header,
+ table_name_attr, field_name_attr, border,
+ row_class, cell_class, header_cell_class)
+ elif fields_as == 'rows':
+ return _HtmlRowTable(app, validator, produce_header,
+ table_name_attr, field_name_attr, border,
+ row_class, cell_class, header_cell_class)
+
+ else:
+ raise ValueError(fields_as)
+
+class _HtmlTableBase(HtmlBase):
mime_type = 'text/html'
- def __init__(self, app=None, validator=None, header_tag='th',
- table_name_attr='class', field_name_attr=None, border=0):
- """Protocol that returns the response object as a html table.
-
- The simple flavour is like the HtmlMicroFormatprotocol, but returns data
- as a html table using the <table> tag.
-
- :param app: A rpclib.application.Application instance.
- :param validator: The validator to use. Ignored.
- :param header_tag: The header tag used to show field names in the
- beginning of the table. Defaults to 'th'. Set to None to skip headers.
- :param table_name_attr: The name of the attribute that will contain the
- response name of the complex object in the table tag. Set to None to
- disable.
- :param field_name_attr: The name of the attribute that will contain the
- field names of the complex object children for every table cell. Set
- to None to disable.
- """
+ def __init__(self, app, validator, produce_header, table_name_attr,
+ field_name_attr, border, row_class, cell_class, header_class):
HtmlBase.__init__(self, app, validator)
- assert header_tag in ('td','th')
- assert table_name_attr in (None, 'class','id')
- assert field_name_attr in (None, 'class','id')
+ assert table_name_attr in (None, 'class', 'id')
+ assert field_name_attr in (None, 'class', 'id')
- self.__header_tag = header_tag
+ self.__produce_header = produce_header
self.__table_name_attr = table_name_attr
self.__field_name_attr = field_name_attr
self.__border = border
+ self.row_class = row_class
+ self.cell_class = cell_class
+ self.header_class = header_class
+
+ if self.cell_class is not None and field_name_attr == 'class':
+ raise Exception("Either 'cell_class' should be None or "
+ "field_name_attr should be != 'class'")
+ if self.header_class is not None and field_name_attr == 'class':
+ raise Exception("Either 'header_class' should be None or "
+ "field_name_attr should be != 'class'")
@property
def border(self):
return self.__border
@property
- def header_tag(self):
- return self.__header_tag
+ def produce_header(self):
+ return self.__produce_header
@property
def table_name_attr(self):
@@ -263,7 +309,7 @@ def table_name_attr(self):
def field_name_attr(self):
return self.__field_name_attr
- def serialize_impl(self, cls, inst):
+ def serialize_impl(self, cls, inst, locale):
name = cls.get_type_name()
if self.table_name_attr is None:
@@ -271,7 +317,7 @@ def serialize_impl(self, cls, inst):
else:
out_body_doc_header = ['<table %s="%s">' % (self.table_name_attr, name)]
- out_body_doc = self.serialize_complex_model(cls, inst)
+ out_body_doc = self.serialize_complex_model(cls, inst, locale)
out_body_doc_footer = ['</table>']
@@ -281,7 +327,9 @@ def serialize_impl(self, cls, inst):
out_body_doc_footer,
)
- def serialize_complex_model(self, cls, value):
+
+class _HtmlColumnTable(_HtmlTableBase):
+ def serialize_complex_model(self, cls, value, locale):
sti = None
fti = cls.get_flat_type_info(cls)
@@ -289,50 +337,244 @@ def serialize_complex_model(self, cls, value):
if len(fti) == 1:
fti = first_child.get_flat_type_info(first_child)
first_child = iter(fti.values()).next()
+
if len(fti) == 1 and first_child.Attributes.max_occurs > 1:
if issubclass(first_child, ComplexModelBase):
sti = first_child.get_simple_type_info(first_child)
- value = value[0]
+
else:
- raise Exception("Can only serialize Array(...) types")
+ raise NotImplementedError("Can only serialize Array(...) types")
+
+ value = value[0]
+
else:
- raise Exception("Can only serialize single Array(...) return types")
+ raise NotImplementedError("Can only serialize single Array(...) return types")
+
+ tr = {}
+ if self.row_class is not None:
+ tr['class'] = self.row_class
+
+ td = {}
+ if self.cell_class is not None:
+ td['class'] = self.cell_class
- header_row = E.tr()
class_name = first_child.get_type_name()
- if sti is None:
- header_row.append(E.th(class_name))
- else:
- if self.field_name_attr is None:
- for k, v in sti.items():
- header_row.append(E.th(k))
+ if self.produce_header:
+ header_row = E.tr(**tr)
+
+ th = {}
+ if self.header_class is not None:
+ th['class'] = self.header_class
+
+ if sti is None:
+ header_row.append(E.th(class_name, **th))
+
else:
- for k, v in sti.items():
- header_row.append(E.th(k,
- **{self.field_name_attr: k}))
+ if self.field_name_attr is not None:
+ for k, v in sti.items():
+ header_row.append(E.th(k), **th)
+ else:
+ for k, v in sti.items():
+ th[self.field_name_attr] = k
+ header_name = translate(v, locale, k)
+ header_row.append(E.th(header_name), **th)
+
+
+ yield header_row
- yield header_row
if sti is None:
if self.field_name_attr is None:
for val in value:
- yield E.tr(E.td(first_child.to_string(val)), )
+ yield E.tr(E.td(first_child.to_string(val), ** td), ** tr)
else:
for val in value:
- yield E.tr(E.td(first_child.to_string(val)),
- **{self.field_name_attr: class_name})
+ td[self.field_name_attr] = class_name
+ yield E.tr(E.td(first_child.to_string(val), ** td), ** tr)
else:
for val in value:
row = E.tr()
- print val
for k, v in sti.items():
subvalue = val
for p in v.path:
- subvalue = getattr(subvalue, p, "`%s`" % k)
+ subvalue = getattr(subvalue, p, None)
+
+ if subvalue is None:
+ if v.type.Attributes.min_occurs == 0:
+ continue
+ else:
+ subvalue = ""
+ else:
+ subvalue = v.type.to_string(subvalue)
+
+
+ if self.field_name_attr is None:
+ row.append(E.td(subvalue, **td))
+ else:
+ td[self.field_name_attr] = k
+ row.append(E.td(subvalue, **td))
+
+ yield row
+
+
+class _HtmlRowTable(_HtmlTableBase):
+ def serialize_complex_model(self, cls, value, locale):
+ sti = None
+ fti = cls.get_flat_type_info(cls)
+ is_array = False
+
+ first_child = iter(fti.values()).next()
+ if len(fti) == 1:
+ fti = first_child.get_flat_type_info(first_child)
+ first_child_2 = iter(fti.values()).next()
+
+ if len(fti) == 1 and first_child_2.Attributes.max_occurs > 1:
+ if issubclass(first_child_2, ComplexModelBase):
+ sti = first_child_2.get_simple_type_info(first_child_2)
+ is_array = True
+
+ else:
+ if issubclass(first_child, ComplexModelBase):
+ sti = first_child.get_simple_type_info(first_child)
+
+ value = value[0]
+
+ else:
+ raise NotImplementedError("Can only serialize single return types")
+
+ tr = {}
+ if self.row_class is not None:
+ tr['class'] = self.row_class
+
+ td = {}
+ if self.cell_class is not None:
+ td['class'] = self.cell_class
+
+ th = {}
+ if self.header_class is not None:
+ th['class'] = self.header_class
+
+ class_name = first_child.get_type_name()
+ if sti is None:
+ if self.field_name_attr is None:
+ if is_array:
+ for val in value:
+ yield E.tr(E.td(first_child_2.to_string(val)),)
+ else:
+ yield E.tr(E.td(first_child_2.to_string(value)),)
+
+ else:
+ if self.field_name_attr is not None:
+ td[self.field_name_attr] = class_name
+
+ if is_array:
+ for val in value:
+ yield E.tr(E.td(first_child_2.to_string(val), **td), **tr)
+ else:
+ yield E.tr(E.td(first_child_2.to_string(value), **td), **tr)
+
+ else:
+ for k, v in sti.items():
+ row = E.tr(**tr)
+ subvalue = value
+ for p in v.path:
+ subvalue = getattr(subvalue, p, None)
+
+ if subvalue is None:
+ if v.type.Attributes.min_occurs == 0:
+ continue
+ else:
+ subvalue = ""
+ else:
+ subvalue = v.type.to_string(subvalue)
+
+ if issubclass(v.type, ImageUri):
+ subvalue = E.img(src=subvalue)
+
+ if self.produce_header:
+ header_text = translate(v.type, locale, k)
if self.field_name_attr is None:
- row.append(E.td(v.type.to_string(subvalue)))
+ row.append(E.th(header_text, **th))
else:
- row.append(E.td(v.type.to_string(subvalue),
- **{self.field_name_attr: k}))
+ th[self.field_name_attr] = k
+ row.append(E.th(header_text, **th))
+
+ if self.field_name_attr is None:
+ row.append(E.td(subvalue, **td))
+
+ else:
+ td[self.field_name_attr] = k
+ row.append(E.td(subvalue, **td))
+
yield row
+
+
+class _HtmlPage(object):
+ """An EXPERIMENTAL protocol-ish that parses and generates a template for
+ a html file.
+
+ >>> open('temp.html', 'w').write('<html><body><div id="some_div" /></body></html>')
+ >>> t = _HtmlPage('temp.html')
+ >>> t.some_div = "some_text"
+ >>> from lxml import html
+ >>> print html.tostring(t.html)
+ <html><body><div id="some_div">some_text</div></body></html>
+ """
+
+ def __init__(self, file_name):
+ self.__frozen = False
+ self.__file_name = file_name
+ self.__html = html.fromstring(open(file_name, 'r').read())
+
+ self.__ids = {}
+ for elt in self.__html.xpath('//*[@id]'):
+ key = elt.attrib['id']
+ if key in self.__ids:
+ raise ValueError("Don't use duplicate values in id attributes in"
+ "template documents.")
+ self.__ids[key] = elt
+ s = "%r -> %r" % (key, elt)
+ logger.debug(s)
+
+ self.__frozen = True
+
+ @property
+ def file_name(self):
+ return self.__file_name
+
+ @property
+ def html(self):
+ return self.__html
+
+ def __getattr__(self, key):
+ try:
+ return object.__getattr__(self, key)
+
+ except AttributeError:
+ try:
+ return self.__ids[key]
+ except KeyError:
+ raise AttributeError(key)
+
+ def __setattr__(self, key, value):
+ if key.endswith('__frozen') or not self.__frozen:
+ object.__setattr__(self, key, value)
+
+ else:
+ elt = self.__ids.get(key, None)
+ if elt is None:
+ raise AttributeError(key)
+
+ # poor man's elt.clear() version that keeps the attributes
+ children = list(elt)
+ for c in children:
+ elt.remove(c)
+ elt.text = None
+ elt.tail = None
+
+ # set it in.
+ if isinstance(value, basestring):
+ elt.text = value
+ else:
+ elt.append(value)
2  src/rpclib/test/interface/test_bare_interface.py
View
@@ -78,7 +78,7 @@ def some_other_call(ctx, sth):
imports = application.interface.imports
tns = application.interface.get_tns()
smm = application.interface.service_method_map
- print imports
+ print(imports)
assert imports[tns] == set(['1','3','4'])
assert imports['3'] == set(['2'])
4 src/rpclib/test/interface/test_xml_schema.py
View
@@ -49,7 +49,7 @@ class SomeType(ComplexModel):
from lxml import etree
docs = get_schema_documents([SomeType])
- print etree.tostring(docs['tns'], pretty_print=True)
+ print(etree.tostring(docs['tns'], pretty_print=True))
any = docs['tns'].xpath('//xsd:any', namespaces={'xsd': ns.xsd})
assert len(any) == 1
@@ -89,7 +89,7 @@ def some_call(ctx, sth):
imports = application.interface.imports
smm = application.interface.service_method_map
- print smm
+ print(smm)
raise NotImplementedError('test something!')
6 src/rpclib/test/interop/test_soap_client_http_twisted.py
View
@@ -41,7 +41,7 @@ def cb(ret):
def test_python_exception(self):
def eb(ret):
- print ret
+ print(ret)
def cb(ret):
assert False, "must fail: %r" % ret
@@ -50,7 +50,7 @@ def cb(ret):
def test_soap_exception(self):
def eb(ret):
- print type(ret)
+ print(type(ret))
def cb(ret):
assert False, "must fail: %r" % ret
@@ -59,7 +59,7 @@ def cb(ret):
def test_documented_exception(self):
def eb(ret):
- print ret
+ print(ret)
def cb(ret):
assert False, "must fail: %r" % ret
2  src/rpclib/test/model/test_binary.py
View
@@ -35,7 +35,7 @@ def setUp(self):
def test_data(self):
element = etree.Element('test')
Soap11().to_parent_element(ByteArray, self.data, ns_test, element)
- print etree.tostring(element, pretty_print=True)
+ print(etree.tostring(element, pretty_print=True))
element = element[0]
a2 = Soap11().from_element(ByteArray, element)
2  src/rpclib/test/model/test_complex.py
View
@@ -342,7 +342,7 @@ class CM(ComplexModel):
pref = CM.get_namespace_prefix(interface)
type_def = wsdl.get_schema_info(pref).types[CM.get_type_name()]
attribute_def = type_def.find('{%s}attribute' % xml_ns.xsd)
- print etree.tostring(type_def, pretty_print=True)
+ print(etree.tostring(type_def, pretty_print=True))
self.assertIsNotNone(attribute_def)
self.assertEqual(attribute_def.get('name'), 'a')
6 src/rpclib/test/model/test_enum.py
View
@@ -81,7 +81,7 @@ def test_wsdl(self):
elt = etree.fromstring(wsdl)
simple_type = elt.xpath('//xs:simpleType', namespaces=self.app.interface.nsmap)[0]
- print(etree.tostring(elt, pretty_print=True))
+ print((etree.tostring(elt, pretty_print=True)))
print(simple_type)
self.assertEquals(simple_type.attrib['name'], 'DaysOfWeekEnum')
@@ -131,7 +131,7 @@ def test_serialize_complex_array(self):
ret = XmlObject().from_element(Array(DaysOfWeekEnum), elt)
assert days == ret
- print(etree.tostring(elt, pretty_print=True))
+ print((etree.tostring(elt, pretty_print=True)))
pprint(self.app.interface.nsmap)
assert days_xml == [ (e.tag, e.text) for e in
@@ -154,7 +154,7 @@ def test_serialize_simple_array(self):
XmlObject().to_parent_element(Test, t, 'test_namespace', elt)
elt = elt[0]
- print(etree.tostring(elt, pretty_print=True))
+ print((etree.tostring(elt, pretty_print=True)))
ret = XmlObject().from_element(Test, elt)
self.assertEquals(t.days, ret.days)
2  src/rpclib/test/model/test_exception.py
View
@@ -180,7 +180,7 @@ def get_type_name_ns(self, app):
self.assertEqual(len(interface.classes), 1)
c_cls = interface.classes.values()[0]
c_elt = schema.types.values()[0]
- print c_cls, cls
+ print(c_cls, cls)
self.failUnless(c_cls is cls)
self.assertEqual(c_elt.tag, '{%s}complexType' % ns_xsd)
self.assertEqual(c_elt.get('name'), 'Fault')
2  src/rpclib/test/model/test_primitive.py
View
@@ -253,7 +253,7 @@ def test_unicode(self):
def test_null(self):
element = etree.Element('test')
XmlObject().to_parent_element(Null, None, ns_test, element)
- print etree.tostring(element)
+ print(etree.tostring(element))
element = element[0]
self.assertTrue( bool(element.attrib.get('{%s}nil' % ns.xsi)) )
141 src/rpclib/test/protocol/test_html_table.py
View
@@ -22,6 +22,9 @@
import unittest
+from pprint import pformat
+from urllib import urlencode
+
from lxml import html
from rpclib.application import Application
@@ -37,18 +40,36 @@
from rpclib.server.wsgi import WsgiMethodContext
from rpclib.server.wsgi import WsgiApplication
+def _start_response(code, headers):
+ print(code, pformat(headers))
+
+def _call_wsgi_app_kwargs(app, **kwargs):
+ return _call_wsgi_app(app, kwargs.items())
+
+def _call_wsgi_app(app, pairs):
+ out_string = ''.join(app({
+ 'QUERY_STRING': urlencode(pairs),
+ 'PATH_INFO': '/some_call',
+ 'REQUEST_METHOD': 'GET',
+ 'SERVER_NAME': 'rpclib.test',
+ 'SERVER_PORT': '0',
+ 'wsgi.url_scheme': 'http',
+ }, _start_response))
+
+ return out_string
-class TestHtmlTable(unittest.TestCase):
- def test_complex_array(self):
- class CM(ComplexModel):
- i = Integer
- s = String
- class CCM(ComplexModel):
- c = CM
- i = Integer
- s = String
+class CM(ComplexModel):
+ i = Integer
+ s = String
+class CCM(ComplexModel):
+ c = CM
+ i = Integer
+ s = String
+
+class TestHtmlColumnTable(unittest.TestCase):
+ def test_complex_array(self):
class SomeService(ServiceBase):
@srpc(CCM, _returns=Array(CCM))
def some_call(ccm):
@@ -57,26 +78,21 @@ def some_call(ccm):
app = Application([SomeService], 'tns', HttpRpc(), HtmlTable(field_name_attr='class'), Wsdl11())
server = WsgiApplication(app)
- initial_ctx = WsgiMethodContext(server, {
- 'QUERY_STRING': 'ccm_c_s=abc&ccm_c_i=123&ccm_i=456&ccm_s=def',
- 'PATH_INFO': '/some_call',
- 'REQUEST_METHOD': 'GET',
- }, 'some-content-type')
+ out_string = _call_wsgi_app_kwargs(server,
+ ccm_i='456',
+ ccm_s='def',
+ ccm_c_i='123',
+ ccm_c_s='abc',
+ )
- ctx, = server.generate_contexts(initial_ctx)
- server.get_in_object(ctx)
- server.get_out_object(ctx)
- server.get_out_string(ctx)
-
- out_string = ''.join(ctx.out_string)
elt = html.fromstring(out_string)
- print html.tostring(elt, pretty_print=True)
+ print(html.tostring(elt, pretty_print=True))
resp = elt.find_class('some_callResponse')
assert len(resp) == 1
for i in range(len(elt)):
row = elt[i]
- if i == 0:
+ if i == 0: # check for field names in table header
cell = row.findall('th[@class="i"]')
assert len(cell) == 1
assert cell[0].text == 'i'
@@ -94,7 +110,7 @@ def some_call(ccm):
assert cell[0].text == 's'
- else:
+ else: # check for field values in table body
cell = row.findall('td[@class="i"]')
assert len(cell) == 1
assert cell[0].text == '456'
@@ -120,18 +136,77 @@ def some_call(s):
app = Application([SomeService], 'tns', HttpRpc(), HtmlTable(), Wsdl11())
server = WsgiApplication(app)
- initial_ctx = WsgiMethodContext(server, {
- 'QUERY_STRING': 's=1&s=2',
- 'PATH_INFO': '/some_call',
- 'REQUEST_METHOD': 'GET',
- }, 'some-content-type')
+ out_string = _call_wsgi_app(server, (('s', '1'), ('s', '2')) )
+ assert out_string == '<table class="some_callResponse"><tr><th>string</th></tr><tr><td>1</td></tr><tr><td>2</td></tr></table>'
+
+class TestHtmlRowTable(unittest.TestCase):
+ def test_complex(self):
+ class SomeService(ServiceBase):
+ @srpc(CCM, _returns=CCM)
+ def some_call(ccm):
+ return ccm
+
+
+ app = Application([SomeService], 'tns', HttpRpc(),
+ HtmlTable(field_name_attr='class', fields_as='rows'), Wsdl11())
+ server = WsgiApplication(app)
+
+ out_string = _call_wsgi_app_kwargs(server,
+ ccm_c_s='abc', ccm_c_i='123', ccm_i='456', ccm_s='def')
+
+ elt = html.fromstring(out_string)
+ print(html.tostring(elt, pretty_print=True))
+
+ # Here's what this is supposed to return
+ """
+ <table class="some_callResponse">
+ <tr>
+ <th class="i">i</th>
+ <td class="i">456</td>
+ </tr>
+ <tr>
+ <th class="c_i">c_i</th>
+ <td class="c_i">123</td>
+ </tr>
+ <tr>
+ <th class="c_s">c_s</th>
+ <td class="c_s">abc</td>
+ </tr>
+ <tr>
+ <th class="s">s</th>
+ <td class="s">def</td>
+ </tr>
+ </table>
+ """
+
+ resp = elt.find_class('some_callResponse')
+ assert len(resp) == 1
+
+ assert elt.xpath('//th[@class="i"]/text()')[0] == 'i'
+ assert elt.xpath('//td[@class="i"]/text()')[0] == '456'
+
+ assert elt.xpath('//th[@class="c_i"]/text()')[0] == 'c_i'
+ assert elt.xpath('//td[@class="c_i"]/text()')[0] == '123'
+
+ assert elt.xpath('//th[@class="c_s"]/text()')[0] == 'c_s'
+ assert elt.xpath('//td[@class="c_s"]/text()')[0] == 'abc'
+
+ assert elt.xpath('//th[@class="s"]/text()')[0] == 's'
+ assert elt.xpath('//td[@class="s"]/text()')[0] == 'def'
+
+
+ def test_string_array(self):
+ class SomeService(ServiceBase):
+ @srpc(String(max_occurs='unbounded'), _returns=Array(String))
+ def some_call(s):
+ return s
+
+ app = Application([SomeService], 'tns', HttpRpc(), HtmlTable(fields_as='rows'), Wsdl11())
+ server = WsgiApplication(app)
- ctx, = server.generate_contexts(initial_ctx)
- server.get_in_object(ctx)
- server.get_out_object(ctx)
- server.get_out_string(ctx)
+ out_string = _call_wsgi_app(server, (('s', '1'), ('s', '2')) )
+ assert out_string == '<table class="some_callResponse"><tr><td>1</td></tr><tr><td>2</td></tr></table>'
- assert ''.join(ctx.out_string) == '<table class="some_callResponse"><tr><th>string</th></tr><tr><td>1</td></tr><tr><td>2</td></tr></table>'
if __name__ == '__main__':
unittest.main()
2  src/rpclib/test/protocol/test_http.py
View
@@ -180,7 +180,7 @@ def some_call(ccm):
server.get_out_string(ctx)
- print ctx.out_string
+ print(ctx.out_string)
assert ctx.out_string == ["CCM(i=1, c=CM(i=3, s='cs'), s='s')"]
def test_nested_flatten_with_multiple_values_1(self):
24 src/rpclib/test/protocol/test_json.py
View
@@ -162,7 +162,7 @@ def some_call(ccm):
server.get_out_string(ctx)
ret = json.loads(''.join(ctx.out_string))
- print ret
+ print(ret)
assert ret['some_callResponse']['some_callResult']['i'] == 4
assert ret['some_callResponse']['some_callResult']['s'] == '4x'
@@ -253,11 +253,11 @@ def some_call(ecm):
ctx, = server.generate_contexts(initial_ctx)
server.get_in_object(ctx)
server.get_out_object(ctx)
- print ctx.in_object
+ print(ctx.in_object)
server.get_out_string(ctx)
ret = json.loads(''.join(ctx.out_string))
- print ret
+ print(ret)
assert ret['some_callResponse']
assert ret['some_callResponse']['some_callResult']
assert ret['some_callResponse']['some_callResult']['ECM']
@@ -287,7 +287,7 @@ def test_invalid_request(self):
class SomeService(ServiceBase):
@srpc(Integer, String, DateTime)
def yay(i,s,d):
- print i,s,d
+ print(i,s,d)
pass
app = Application([SomeService], 'tns', JsonObject(validator='soft'), JsonObject(), Wsdl11())
@@ -296,15 +296,15 @@ def yay(i,s,d):
initial_ctx = MethodContext(server)
initial_ctx.in_string = ['{"some_call": {"yay": []}}']
ctx, = server.generate_contexts(initial_ctx)
- print ctx.in_error
+ print(ctx.in_error)
assert ctx.in_error.faultcode == 'Client.ResourceNotFound'
- print
+ print()
def test_invalid_string(self):
class SomeService(ServiceBase):
@srpc(Integer, String, DateTime)
def yay(i,s,d):
- print i,s,d
+ print(i,s,d)
pass
app = Application([SomeService], 'tns', JsonObject(validator='soft'),
@@ -322,7 +322,7 @@ def test_invalid_number(self):
class SomeService(ServiceBase):
@srpc(Integer, String, DateTime)
def yay(i,s,d):
- print i,s,d
+ print(i,s,d)
pass
app = Application([SomeService], 'tns', JsonObject(validator='soft'),
@@ -340,7 +340,7 @@ def test_missing_value(self):
class SomeService(ServiceBase):
@srpc(Integer, String, Mandatory.DateTime)
def yay(i,s,d):
- print i,s,d
+ print(i,s,d)
pass
app = Application([SomeService], 'tns', JsonObject(validator='soft'),
@@ -352,7 +352,7 @@ def yay(i,s,d):
ctx, = server.generate_contexts(initial_ctx)
server.get_in_object(ctx)
- print ctx.in_error.faultstring
+ print(ctx.in_error.faultstring)
assert ctx.in_error.faultcode == 'Client.ValidationError'
assert ctx.in_error.faultstring.endswith("frequency constraints.")
@@ -360,7 +360,7 @@ def test_invalid_datetime(self):
class SomeService(ServiceBase):
@srpc(Integer, String, Mandatory.DateTime)
def yay(i,s,d):
- print i,s,d
+ print(i,s,d)
pass
app = Application([SomeService], 'tns', JsonObject(validator='soft'),
@@ -372,7 +372,7 @@ def yay(i,s,d):
ctx, = server.generate_contexts(initial_ctx)
server.get_in_object(ctx)
- print ctx.in_error
+ print(ctx.in_error)
assert ctx.in_error.faultcode == 'Client.ValidationError'
if __name__ == '__main__':
6 src/rpclib/test/test_service.py
View
@@ -165,7 +165,7 @@ def test_multiple_return(self):
MultipleReturnService.get_tns(), sent_xml)
sent_xml = sent_xml[0]
- print(etree.tostring(sent_xml, pretty_print=True))
+ print((etree.tostring(sent_xml, pretty_print=True)))
response_data = self.app.out_protocol.from_element(message_class, sent_xml)
self.assertEquals(len(response_data), 3)
@@ -231,7 +231,7 @@ def call(s):
data.append(s)
def start_response(code, headers):
- print code, headers
+ print(code, headers)
app = Application([Service, AuxService], 'tns', HttpRpc(), HttpRpc())
server = WsgiApplication(app)
@@ -261,7 +261,7 @@ def call(s):
data.add(s + "aux")
def start_response(code, headers):
- print code, headers
+ print(code, headers)
app = Application([Service, AuxService], 'tns', HttpRpc(), HttpRpc())
server = WsgiApplication(app)
2  src/rpclib/util/odict.py
View
@@ -92,7 +92,7 @@ def keys(self):
return self.__list
def update(self, data):
- if isinstance(data, dict):
+ if isinstance(data, (dict, odict)):
data = data.items()
for k, v in data:
Something went wrong with that request. Please try again.