From d070abf5deab5bd7433f132e0760cb539016fec5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Germ=C3=A1n=20Larra=C3=ADn?= Date: Wed, 12 Jun 2019 20:23:26 -0400 Subject: [PATCH] libs: add module `io_utils` Include functions: - `with_mode_binary` - `with_mode_text` - `with_encoding_utf8` --- cl_sii/libs/io_utils.py | 71 +++++++++++++++ tests/test_libs_io_utils.py | 170 ++++++++++++++++++++++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 cl_sii/libs/io_utils.py create mode 100644 tests/test_libs_io_utils.py diff --git a/cl_sii/libs/io_utils.py b/cl_sii/libs/io_utils.py new file mode 100644 index 00000000..a69ce6a3 --- /dev/null +++ b/cl_sii/libs/io_utils.py @@ -0,0 +1,71 @@ +import codecs +import io +from typing import IO + + +# notes: +# - For streams and modes see 'io.open()' +# - Stream classes have a pretty strange 'typing'/ABC/inheritance/etc arrangement because, +# among others, they are implemented in C. +# - Use `IO[X]` for arguments and `TextIO`/`BinaryIO` for return types (says GVR). +# https://github.com/python/typing/issues/518#issuecomment-350903120 + + +def with_mode_binary(stream: IO) -> bool: + """ + Return whether ``stream`` is a binary stream (i.e. reads bytes). + """ + result = False + try: + result = 'b' in stream.mode + except AttributeError: + if isinstance(stream, (io.RawIOBase, io.BufferedIOBase, io.BytesIO)): + result = True + + return result + + +def with_mode_text(stream: IO) -> bool: + """ + Return whether ``stream`` is a text stream (i.e. reads strings). + """ + result = False + try: + result = 't' in stream.mode + except AttributeError: + if isinstance(stream, (io.TextIOBase, io.TextIOWrapper, io.StringIO)): + result = True + + return result + + +def with_encoding_utf8(text_stream: IO[str]) -> bool: + """ + Return whether ``text_stream`` is a text stream with encoding set to UTF-8. + + :raises TypeError: if ``text_stream`` is not a text stream + + """ + result = False + + if isinstance(text_stream, io.StringIO): + # note: 'StringIO' saves (unicode) strings in memory and therefore doesn't have (or need) + # an encoding, which is fine. + # https://stackoverflow.com/questions/9368865/io-stringio-encoding-in-python3/9368909#9368909 + result = True + else: + try: + text_stream_encoding: str = text_stream.encoding # type: ignore + except AttributeError as exc: + raise TypeError("Value is not a text stream.") from exc + if text_stream_encoding is None: + # e.g. the strange case of `tempfile.SpooledTemporaryFile(mode='rt', encoding='utf-8')` + pass + else: + try: + text_stream_encoding_norm = codecs.lookup(text_stream_encoding).name + result = text_stream_encoding_norm == 'utf-8' + except LookupError: + pass + + return result diff --git a/tests/test_libs_io_utils.py b/tests/test_libs_io_utils.py new file mode 100644 index 00000000..7a850b3e --- /dev/null +++ b/tests/test_libs_io_utils.py @@ -0,0 +1,170 @@ +import io +import pathlib +import tempfile +import unittest + +from cl_sii.libs.io_utils import with_encoding_utf8, with_mode_binary, with_mode_text # noqa: F401 + + +class FunctionsTest(unittest.TestCase): + + def test_with_encoding_utf8(self): + filename = pathlib.Path(__file__).with_name('test_libs_io_utils-test-file-1.tmp') + filename.touch() + + # Binary mode + + with open(str(filename), mode='rb') as f: + self.assertTrue(isinstance(f, io.BufferedReader)) + with self.assertRaises(TypeError): + with_encoding_utf8(f) + + with open(str(filename), mode='wb') as f: + self.assertTrue(isinstance(f, io.BufferedWriter)) + with self.assertRaises(TypeError): + with_encoding_utf8(f) + + with open(str(filename), mode='w+b') as f: + self.assertTrue(isinstance(f, io.BufferedRandom)) + with self.assertRaises(TypeError): + with_encoding_utf8(f) + + with io.BytesIO() as f: + self.assertTrue(isinstance(f, io.BytesIO)) + with self.assertRaises(TypeError): + with_encoding_utf8(f) + + with tempfile.NamedTemporaryFile() as f: + self.assertTrue(isinstance(f, tempfile._TemporaryFileWrapper)) + with self.assertRaises(TypeError): + with_encoding_utf8(f) + + with tempfile.SpooledTemporaryFile() as f: + self.assertTrue(isinstance(f, tempfile.SpooledTemporaryFile)) + with self.assertRaises(TypeError): + with_encoding_utf8(f) + + # Text mode - encoding 'utf-8' + + with open(str(filename), mode='rt', encoding='utf-8') as f: + self.assertTrue(isinstance(f, io.TextIOWrapper)) + self.assertTrue(with_encoding_utf8(f)) + + with open(str(filename), mode='wt', encoding='utf-8') as f: + self.assertTrue(isinstance(f, io.TextIOWrapper)) + self.assertTrue(with_encoding_utf8(f)) + + with open(str(filename), mode='w+t', encoding='utf-8') as f: + self.assertTrue(isinstance(f, io.TextIOWrapper)) + self.assertTrue(with_encoding_utf8(f)) + + with io.StringIO() as f: + # note: has no encoding + self.assertTrue(isinstance(f, io.StringIO)) + self.assertTrue(with_encoding_utf8(f)) + + with tempfile.NamedTemporaryFile(mode='rt', encoding='utf-8') as f: + self.assertTrue(isinstance(f, tempfile._TemporaryFileWrapper)) + self.assertTrue(with_encoding_utf8(f)) + + with tempfile.SpooledTemporaryFile(mode='rt', encoding='utf-8') as f: + self.assertTrue(isinstance(f, tempfile.SpooledTemporaryFile)) + # note: this is a strange case. + self.assertFalse(with_encoding_utf8(f)) + + # Text mode - encoding 'latin1' + + with open(str(filename), mode='rt', encoding='latin1') as f: + self.assertTrue(isinstance(f, io.TextIOWrapper)) + self.assertFalse(with_encoding_utf8(f)) + + with open(str(filename), mode='wt', encoding='latin1') as f: + self.assertTrue(isinstance(f, io.TextIOWrapper)) + self.assertFalse(with_encoding_utf8(f)) + + with open(str(filename), mode='w+t', encoding='latin1') as f: + self.assertTrue(isinstance(f, io.TextIOWrapper)) + self.assertFalse(with_encoding_utf8(f)) + + with tempfile.NamedTemporaryFile(mode='rt', encoding='latin1') as f: + self.assertTrue(isinstance(f, tempfile._TemporaryFileWrapper)) + self.assertFalse(with_encoding_utf8(f)) + + with tempfile.SpooledTemporaryFile(mode='rt', encoding='latin1') as f: + self.assertTrue(isinstance(f, tempfile.SpooledTemporaryFile)) + self.assertFalse(with_encoding_utf8(f)) + + filename.unlink() + + def test_with_mode_x(self): + # For the sake of simplicity test here both 'with_mode_binary' and 'with_mode_text'. + + filename = pathlib.Path(__file__).with_name('test_libs_io_utils-test-file-2.tmp') + filename.touch() + + # Binary mode + + with open(str(filename), mode='rb') as f: + self.assertTrue(isinstance(f, io.BufferedReader)) + self.assertTrue(with_mode_binary(f)) + self.assertFalse(with_mode_text(f)) + + with open(str(filename), mode='wb') as f: + self.assertTrue(isinstance(f, io.BufferedWriter)) + self.assertTrue(with_mode_binary(f)) + self.assertFalse(with_mode_text(f)) + + with open(str(filename), mode='w+b') as f: + self.assertTrue(isinstance(f, io.BufferedRandom)) + self.assertTrue(with_mode_binary(f)) + self.assertFalse(with_mode_text(f)) + + with io.BytesIO() as f: + self.assertTrue(isinstance(f, io.BytesIO)) + self.assertTrue(with_mode_binary(f)) + self.assertFalse(with_mode_text(f)) + + with tempfile.NamedTemporaryFile() as f: + + self.assertTrue(isinstance(f, tempfile._TemporaryFileWrapper)) + self.assertTrue(with_mode_binary(f)) + self.assertFalse(with_mode_text(f)) + + with tempfile.SpooledTemporaryFile() as f: + self.assertTrue(isinstance(f, tempfile.SpooledTemporaryFile)) + self.assertTrue(with_mode_binary(f)) + self.assertFalse(with_mode_text(f)) + + # Text mode + + with open(str(filename), mode='rt') as f: + self.assertTrue(isinstance(f, io.TextIOWrapper)) + self.assertFalse(with_mode_binary(f)) + self.assertTrue(with_mode_text(f)) + + with open(str(filename), mode='wt') as f: + self.assertTrue(isinstance(f, io.TextIOWrapper)) + self.assertFalse(with_mode_binary(f)) + self.assertTrue(with_mode_text(f)) + + with open(str(filename), mode='w+t') as f: + self.assertTrue(isinstance(f, io.TextIOWrapper)) + self.assertFalse(with_mode_binary(f)) + self.assertTrue(with_mode_text(f)) + + with io.StringIO() as f: + self.assertTrue(isinstance(f, io.StringIO)) + self.assertFalse(with_mode_binary(f)) + self.assertTrue(with_mode_text(f)) + + with tempfile.NamedTemporaryFile(mode='rt') as f: + self.assertTrue(isinstance(f, tempfile._TemporaryFileWrapper)) + self.assertFalse(with_mode_binary(f)) + self.assertTrue(with_mode_text(f)) + + with tempfile.SpooledTemporaryFile(mode='rt') as f: + self.assertTrue(isinstance(f, tempfile.SpooledTemporaryFile)) + self.assertFalse(with_mode_binary(f)) + self.assertTrue(with_mode_text(f)) + + filename.unlink()