Showing with 46 additions and 8 deletions.
  1. +35 −7 pywps/inout/basic.py
  2. +11 −1 tests/test_inout.py
@@ -26,6 +26,21 @@
SOURCE_TYPE = _SOURCE_TYPE(0, 1, 2, 3)


def _is_textfile(filename):
try:
# use python-magic if available
import magic
is_text = 'text/' in magic.from_file(filename, mime=True)
except ImportError:
# read the first part of the file to check for a binary indicator.
# This method won't detect all binary files.
blocksize = 512
fh = open(filename, 'rb')
is_text = b'\x00' not in fh.read(blocksize)
fh.close()
return is_text


class IOHandler(object):
"""Basic IO class. Provides functions, to accept input data in file,
memory object and stream object and give them out in all three types
@@ -186,16 +201,29 @@ def get_stream(self):
else:
return StringIO(text_type(self.source))

def _openmode(self):
openmode = 'r'
if not PY2:
# in Python 3 we need to open binary files in binary mode.
checked = False
if hasattr(self, 'data_format'):
if self.data_format.encoding == 'base64':
# binary, when the data is to be encoded to base64
openmode += 'b'
checked = True
elif 'text/' in self.data_format.mime_type:
# not binary, when mime_type is 'text/'
checked = True
# when we can't guess it from the mime_type, we need to check the file.
# mimetypes like application/xml and application/json are text files too.
if not checked and not _is_textfile(self.source):
openmode += 'b'
return openmode

def get_data(self):
"""Get source as simple data object"""
if self.source_type == SOURCE_TYPE.FILE:
openmode = 'r'
if (not PY2 and hasattr(self, 'data_format') and
self.data_format.encoding == 'base64'):
# on Python 3, when the data is to be encoded to base64, we
# need to open the file in binary mode
openmode += 'b'
file_handler = open(self.source, mode=openmode)
file_handler = open(self.source, mode=self._openmode())
content = file_handler.read()
file_handler.close()
return content
@@ -14,7 +14,7 @@
from pywps.validator import get_validator
from pywps import NAMESPACES
from pywps.inout.basic import IOHandler, SOURCE_TYPE, SimpleHandler, BBoxInput, BBoxOutput, \
ComplexInput, ComplexOutput, LiteralOutput, LiteralInput
ComplexInput, ComplexOutput, LiteralOutput, LiteralInput, _is_textfile
from pywps.inout import BoundingBoxInput as BoundingBoxInputXML
from pywps.inout.literaltypes import convert, AllowedValue
from pywps._compat import StringIO, text_type
@@ -24,6 +24,8 @@

from lxml import etree

DATA_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')


def get_data_format(mime_type):
return Format(mime_type=mime_type,
@@ -146,6 +148,14 @@ def test_data_bytes(self):
self.iohandler.stream.close()
self.assertEqual(self._value, stream_data, 'Stream obtained')

def test_is_textfile(self):
geotiff = os.path.join(DATA_DIR, 'geotiff', 'dem.tiff')
self.assertFalse(_is_textfile(geotiff))
gml = os.path.join(DATA_DIR, 'gml', 'point.gml')
self.assertTrue(_is_textfile(gml))
geojson = os.path.join(DATA_DIR, 'json', 'point.geojson')
self.assertTrue(_is_textfile(geojson))


class ComplexInputTest(unittest.TestCase):
"""ComplexInput test cases"""