Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions cl_sii/libs/io_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import codecs
import io
from typing import IO


# notes:
# - For streams and modes see 'io.open()'
# - Stream classes have a pretty strange 'typing'/ABC/inheritance/etc arrangement because,
# among others, they are implemented in C.
# - Use `IO[X]` for arguments and `TextIO`/`BinaryIO` for return types (says GVR).
# https://github.com/python/typing/issues/518#issuecomment-350903120


def with_mode_binary(stream: IO) -> bool:
"""
Return whether ``stream`` is a binary stream (i.e. reads bytes).
"""
result = False
try:
result = 'b' in stream.mode
except AttributeError:
if isinstance(stream, (io.RawIOBase, io.BufferedIOBase, io.BytesIO)):
result = True

return result


def with_mode_text(stream: IO) -> bool:
"""
Return whether ``stream`` is a text stream (i.e. reads strings).
"""
result = False
try:
result = 't' in stream.mode
except AttributeError:
if isinstance(stream, (io.TextIOBase, io.TextIOWrapper, io.StringIO)):
result = True

return result


def with_encoding_utf8(text_stream: IO[str]) -> bool:
"""
Return whether ``text_stream`` is a text stream with encoding set to UTF-8.

:raises TypeError: if ``text_stream`` is not a text stream

"""
result = False

if isinstance(text_stream, io.StringIO):
# note: 'StringIO' saves (unicode) strings in memory and therefore doesn't have (or need)
# an encoding, which is fine.
# https://stackoverflow.com/questions/9368865/io-stringio-encoding-in-python3/9368909#9368909
result = True
else:
try:
text_stream_encoding: str = text_stream.encoding # type: ignore
except AttributeError as exc:
raise TypeError("Value is not a text stream.") from exc
if text_stream_encoding is None:
# e.g. the strange case of `tempfile.SpooledTemporaryFile(mode='rt', encoding='utf-8')`
pass
else:
try:
text_stream_encoding_norm = codecs.lookup(text_stream_encoding).name
result = text_stream_encoding_norm == 'utf-8'
except LookupError:
pass

return result
170 changes: 170 additions & 0 deletions tests/test_libs_io_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import io
import pathlib
import tempfile
import unittest

from cl_sii.libs.io_utils import with_encoding_utf8, with_mode_binary, with_mode_text # noqa: F401


class FunctionsTest(unittest.TestCase):

def test_with_encoding_utf8(self):
filename = pathlib.Path(__file__).with_name('test_libs_io_utils-test-file-1.tmp')
filename.touch()

# Binary mode

with open(str(filename), mode='rb') as f:
self.assertTrue(isinstance(f, io.BufferedReader))
with self.assertRaises(TypeError):
with_encoding_utf8(f)

with open(str(filename), mode='wb') as f:
self.assertTrue(isinstance(f, io.BufferedWriter))
with self.assertRaises(TypeError):
with_encoding_utf8(f)

with open(str(filename), mode='w+b') as f:
self.assertTrue(isinstance(f, io.BufferedRandom))
with self.assertRaises(TypeError):
with_encoding_utf8(f)

with io.BytesIO() as f:
self.assertTrue(isinstance(f, io.BytesIO))
with self.assertRaises(TypeError):
with_encoding_utf8(f)

with tempfile.NamedTemporaryFile() as f:
self.assertTrue(isinstance(f, tempfile._TemporaryFileWrapper))
with self.assertRaises(TypeError):
with_encoding_utf8(f)

with tempfile.SpooledTemporaryFile() as f:
self.assertTrue(isinstance(f, tempfile.SpooledTemporaryFile))
with self.assertRaises(TypeError):
with_encoding_utf8(f)

# Text mode - encoding 'utf-8'

with open(str(filename), mode='rt', encoding='utf-8') as f:
self.assertTrue(isinstance(f, io.TextIOWrapper))
self.assertTrue(with_encoding_utf8(f))

with open(str(filename), mode='wt', encoding='utf-8') as f:
self.assertTrue(isinstance(f, io.TextIOWrapper))
self.assertTrue(with_encoding_utf8(f))

with open(str(filename), mode='w+t', encoding='utf-8') as f:
self.assertTrue(isinstance(f, io.TextIOWrapper))
self.assertTrue(with_encoding_utf8(f))

with io.StringIO() as f:
# note: has no encoding
self.assertTrue(isinstance(f, io.StringIO))
self.assertTrue(with_encoding_utf8(f))

with tempfile.NamedTemporaryFile(mode='rt', encoding='utf-8') as f:
self.assertTrue(isinstance(f, tempfile._TemporaryFileWrapper))
self.assertTrue(with_encoding_utf8(f))

with tempfile.SpooledTemporaryFile(mode='rt', encoding='utf-8') as f:
self.assertTrue(isinstance(f, tempfile.SpooledTemporaryFile))
# note: this is a strange case.
self.assertFalse(with_encoding_utf8(f))

# Text mode - encoding 'latin1'

with open(str(filename), mode='rt', encoding='latin1') as f:
self.assertTrue(isinstance(f, io.TextIOWrapper))
self.assertFalse(with_encoding_utf8(f))

with open(str(filename), mode='wt', encoding='latin1') as f:
self.assertTrue(isinstance(f, io.TextIOWrapper))
self.assertFalse(with_encoding_utf8(f))

with open(str(filename), mode='w+t', encoding='latin1') as f:
self.assertTrue(isinstance(f, io.TextIOWrapper))
self.assertFalse(with_encoding_utf8(f))

with tempfile.NamedTemporaryFile(mode='rt', encoding='latin1') as f:
self.assertTrue(isinstance(f, tempfile._TemporaryFileWrapper))
self.assertFalse(with_encoding_utf8(f))

with tempfile.SpooledTemporaryFile(mode='rt', encoding='latin1') as f:
self.assertTrue(isinstance(f, tempfile.SpooledTemporaryFile))
self.assertFalse(with_encoding_utf8(f))

filename.unlink()

def test_with_mode_x(self):
# For the sake of simplicity test here both 'with_mode_binary' and 'with_mode_text'.

filename = pathlib.Path(__file__).with_name('test_libs_io_utils-test-file-2.tmp')
filename.touch()

# Binary mode

with open(str(filename), mode='rb') as f:
self.assertTrue(isinstance(f, io.BufferedReader))
self.assertTrue(with_mode_binary(f))
self.assertFalse(with_mode_text(f))

with open(str(filename), mode='wb') as f:
self.assertTrue(isinstance(f, io.BufferedWriter))
self.assertTrue(with_mode_binary(f))
self.assertFalse(with_mode_text(f))

with open(str(filename), mode='w+b') as f:
self.assertTrue(isinstance(f, io.BufferedRandom))
self.assertTrue(with_mode_binary(f))
self.assertFalse(with_mode_text(f))

with io.BytesIO() as f:
self.assertTrue(isinstance(f, io.BytesIO))
self.assertTrue(with_mode_binary(f))
self.assertFalse(with_mode_text(f))

with tempfile.NamedTemporaryFile() as f:

self.assertTrue(isinstance(f, tempfile._TemporaryFileWrapper))
self.assertTrue(with_mode_binary(f))
self.assertFalse(with_mode_text(f))

with tempfile.SpooledTemporaryFile() as f:
self.assertTrue(isinstance(f, tempfile.SpooledTemporaryFile))
self.assertTrue(with_mode_binary(f))
self.assertFalse(with_mode_text(f))

# Text mode

with open(str(filename), mode='rt') as f:
self.assertTrue(isinstance(f, io.TextIOWrapper))
self.assertFalse(with_mode_binary(f))
self.assertTrue(with_mode_text(f))

with open(str(filename), mode='wt') as f:
self.assertTrue(isinstance(f, io.TextIOWrapper))
self.assertFalse(with_mode_binary(f))
self.assertTrue(with_mode_text(f))

with open(str(filename), mode='w+t') as f:
self.assertTrue(isinstance(f, io.TextIOWrapper))
self.assertFalse(with_mode_binary(f))
self.assertTrue(with_mode_text(f))

with io.StringIO() as f:
self.assertTrue(isinstance(f, io.StringIO))
self.assertFalse(with_mode_binary(f))
self.assertTrue(with_mode_text(f))

with tempfile.NamedTemporaryFile(mode='rt') as f:
self.assertTrue(isinstance(f, tempfile._TemporaryFileWrapper))
self.assertFalse(with_mode_binary(f))
self.assertTrue(with_mode_text(f))

with tempfile.SpooledTemporaryFile(mode='rt') as f:
self.assertTrue(isinstance(f, tempfile.SpooledTemporaryFile))
self.assertFalse(with_mode_binary(f))
self.assertTrue(with_mode_text(f))

filename.unlink()