Skip to content

Commit

Permalink
Version 0.5.1 (#14)
Browse files Browse the repository at this point in the history
Version 0.5.1

* Adding default argument to confignamespace's int, float, list and boolean methods
* Adding change_logger_levels
* Changing version location to init so it can be accessed properly
* Changing protected_keys in Namespace to be hidden from documentation
* Changing linux only tests to be in their own class
* Breaking change: keyword arg position for confignamespace.list now has 'default' as first kwarg
  • Loading branch information
cdgriffith committed Oct 14, 2016
1 parent 275be6e commit ee3c9f1
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 79 deletions.
10 changes: 10 additions & 0 deletions CHANGES.rst
@@ -1,6 +1,16 @@
Changelog
=========

Version 0.5.1
-------------

- Adding default argument to confignamespace's int, float, list and boolean methods
- Adding change_logger_levels
- Changing __version__ location to __init__ so it can be accessed properly
- Changing protected_keys in Namespace to be hidden from documentation
- Changing linux only tests to be in their own class
- Breaking change: keyword arg position for confignamespace.list now has 'default' as first kwarg

Version 0.5.0
-------------

Expand Down
3 changes: 2 additions & 1 deletion reusables/__init__.py
Expand Up @@ -6,4 +6,5 @@
from .browser import *
from .wrappers import *


__author__ = "Chris Griffith"
__version__ = "0.5.1"
13 changes: 13 additions & 0 deletions reusables/log.py
Expand Up @@ -135,3 +135,16 @@ def remove_all_handlers(logger):
remove_file_handlers(logger)
logger.handlers = []


def change_logger_levels(logger, level=_logging.DEBUG):
"""
Go through the logger and handlers and update their levels to the
one specified.
:param logger: logging object to modify
:param level: logging level to set at (10=Debug, 20=Info, 30=Warn, 40=Error)
:return:
"""
logger.setLevel(level)
for handler in logger.handlers:
handler.level = level
63 changes: 44 additions & 19 deletions reusables/namespace.py
Expand Up @@ -21,7 +21,7 @@ class Namespace(dict):
- namespace['spam'].eggs
"""

protected_keys = dir({}) + ['from_dict', 'to_dict']
_protected_keys = dir({}) + ['from_dict', 'to_dict']

def __init__(self, *args, **kwargs):
if len(args) == 1 and isinstance(args[0], dict):
Expand All @@ -47,7 +47,7 @@ def __getattr__(self, item):
raise AttributeError(item)

def __setattr__(self, key, value):
if key in self.protected_keys:
if key in self._protected_keys:
raise AttributeError("Key name '{0}' is protected".format(key))
if isinstance(value, dict):
value = self.__class__(**value)
Expand Down Expand Up @@ -128,8 +128,8 @@ class ConfigNamespace(Namespace):
"""

protected_keys = dir({}) + ['from_dict', 'to_dict', 'bool', 'int', 'float',
'list', 'getboolean', 'getfloat', 'getint']
_protected_keys = dir({}) + ['from_dict', 'to_dict', 'bool', 'int', 'float',
'list', 'getboolean', 'getfloat', 'getint']

def __getattr__(self, item):
"""Config file keys are stored in lower case, be a little more
Expand All @@ -139,13 +139,20 @@ def __getattr__(self, item):
except AttributeError:
return super(ConfigNamespace, self).__getattr__(item.lower())

def bool(self, item):
def bool(self, item, default=None):
""" Return value of key as a boolean
:param item: key of value to transform
:param default: value to return if item does not exist
:return: approximated bool of value
"""
item = self.__getattr__(item)
try:
item = self.__getattr__(item)
except AttributeError as err:
if default is not None:
return default
raise err

if isinstance(item, (bool, int)):
return bool(item)

Expand All @@ -155,34 +162,52 @@ def bool(self, item):

return True if item else False

def int(self, item):
def int(self, item, default=None):
""" Return value of key as an int
:param item: key of value to transform
:param default: value to return if item does not exist
:return: int of value
"""
item = self.__getattr__(item)
try:
item = self.__getattr__(item)
except AttributeError as err:
if default is not None:
return default
raise err
return int(item)

def float(self, item):
def float(self, item, default=None):
""" Return value of key as a float
:param item: key of value to transform
:param default: value to return if item does not exist
:return: float of value
"""
item = self.__getattr__(item)
try:
item = self.__getattr__(item)
except AttributeError as err:
if default is not None:
return default
raise err
return float(item)

def list(self, item, spliter=",", strip=True, mod=None):
def list(self, item, default=None, spliter=",", strip=True, mod=None):
""" Return value of key as a list
:param item: key of value to transform
:param mod: function to map against list
:param default: value to return if item does not exist
:param spliter: character to split str on
:param strip: clean the list with the `strip`
:param mod: function to map against list
:return: list of items
"""
item = self.__getattr__(item)
try:
item = self.__getattr__(item)
except AttributeError as err:
if default is not None:
return default
raise err
if strip:
item = item.lstrip("[").rstrip("]")
out = [x.strip() if strip else x for x in item.split(spliter)]
Expand All @@ -192,11 +217,11 @@ def list(self, item, spliter=",", strip=True, mod=None):

# loose configparser compatibility

def getboolean(self, item):
return self.bool(item)
def getboolean(self, item, default=None):
return self.bool(item, default)

def getint(self, item):
return self.int(item)
def getint(self, item, default=None):
return self.int(item, default)

def getfloat(self, item):
return self.float(item)
def getfloat(self, item, default=None):
return self.float(item, default)
55 changes: 52 additions & 3 deletions reusables/reusables.py
Expand Up @@ -11,14 +11,12 @@
import tempfile as _tempfile
import csv as _csv
import json as _json
import subprocess as _subprocess

from .namespace import Namespace, ConfigNamespace
from .log import get_logger

__author__ = "Chris Griffith"
__version__ = "0.5.0"

version = __version__
python_version = _sys.version_info[0:3]
version_string = ".".join([str(x) for x in python_version])
current_root = _os.path.abspath(".")
Expand Down Expand Up @@ -667,3 +665,54 @@ def touch(path):
"""
with open(path, 'a'):
_os.utime(path, None)


def run(command, input=None, stdout=_subprocess.PIPE, stderr=_subprocess.PIPE,
timeout=None, **kwargs):
"""
Cross platform compatible subprocess with CompletedProcess return.
:param command: command to run, str if shell=True otherwise must be list
:param input: send something `communicate`
:param stdout: PIPE or None
:param stderr: PIPE or None
:param timeout: max time to wait for command to complete
:param kwargs: additional arguments to pass to Popen
:return: CompletedProcess class
"""
if _sys.version_info >= (3, 5):
return _subprocess.run(command, input=input, stdout=stdout,
stderr=stderr, timeout=timeout, **kwargs)

class CompletedProcess(object):
"""A backwards compatible clone of subprocess.CompletedProcess"""

def __init__(self, args, returncode, stdout=None, stderr=None):
self.args = args
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr

def __repr__(self):
args = ['args={0!r}'.format(self.args),
'returncode={0!r}'.format(self.returncode),
'stdout={0!r}'.format(self.stdout) if self.stdout else '',
'stderr={0!r}'.format(self.stderr) if self.stderr else '']
return "{0}({1})".format(type(self).__name__,
', '.join(filter(None, args)))

def check_returncode(self):
if self.returncode:
raise _subprocess.CalledProcessError(self.returncode,
self.args,
self.stdout,
self.stderr)

proc = _subprocess.Popen(command, stdout=stdout, stderr=stderr, **kwargs)
if PY3:
out, err = proc.communicate(input=input, timeout=timeout)
else:
if timeout:
raise NotImplementedError("Timeout is only available on python 3")
out, err = proc.communicate(input=input)
return CompletedProcess(command, proc.returncode, out, err)
3 changes: 1 addition & 2 deletions reusables/wrappers.py
Expand Up @@ -53,9 +53,8 @@ def reuse(func):
so that any module calling the default function will act as if it's a
partial, and then may unknowingly change what the partial becomes!
"""
import functools

@functools.wraps(func)
@_wraps(func)
def wrapper(*args, **kwargs):
global _reuse_cache
cache = _reuse_cache.get(func.__name__, dict(args=[], kwargs={}))
Expand Down
108 changes: 56 additions & 52 deletions test/test_reuse.py
Expand Up @@ -7,6 +7,7 @@
import tarfile
import tempfile
import reusables
import subprocess

test_root = os.path.abspath(os.path.dirname(__file__))
data_dr = os.path.join(test_root, "data")
Expand Down Expand Up @@ -39,31 +40,6 @@ def tearDownClass(cls):
if os.path.exists(test_structure):
shutil.rmtree(test_structure)

def test_join_path_clean(self):
if not reusables.nix_based:
self.skipTest("Linux based test")
resp = reusables.join_paths('/test/', 'clean/', 'path')
assert resp == '/test/clean/path', resp

def test_join_path_dirty(self):
if not reusables.nix_based:
self.skipTest("Linux based test")
resp = reusables.join_paths('/test/', '/dirty/', ' path.file ')
assert resp == '/test/dirty/path.file', resp

def test_join_path_clean_strict(self):
if not reusables.nix_based:
self.skipTest("Linux based test")
resp = reusables.join_paths('/test/', 'clean/', 'path/')
assert resp == '/test/clean/path/', resp

def test_join_root(self):
if not reusables.nix_based:
self.skipTest("Linux based test")
resp = reusables.join_root('clean/')
path = os.path.abspath(os.path.join(".", 'clean/'))
assert resp == path, (resp, path)

def test_get_config_dict(self):
resp = reusables.config_dict(os.path.join(test_root, 'test_config.cfg'))
assert resp['Section1']['key 1'] == 'value 1'
Expand Down Expand Up @@ -104,22 +80,6 @@ def test_safe_good_filename(self):
resp = reusables.safe_filename(infilename)
assert resp == infilename, resp

def test_safe_bad_path(self):
if not reusables.nix_based:
self.skipTest("Linux based test")
path = "/var/lib\\/test/p?!ath/fi\0lename.txt"
expected = "/var/lib_/test/p__ath/fi_lename.txt"
resp = reusables.safe_path(path)
assert not [x for x in ("!", "?", "\0", "^", "&", "*") if x in resp], resp
assert resp == expected, resp

def test_safe_good_path(self):
if not reusables.nix_based:
self.skipTest("Linux based test")
path = "/var/lib/test/path/filename.txt"
resp = reusables.safe_path(path)
assert resp == path, resp

def test_sorting(self):
al = [{"name": "a"}, {"name": "c"}, {"name": "b"}]
resp = reusables.sort_by(al, "name")
Expand Down Expand Up @@ -362,6 +322,61 @@ def test_config_reader_bad(self):
else:
assert False

def test_run(self):
cl = reusables.run('echo test', shell=True, stderr=None)
try:
cl.check_returncode()
except subprocess.CalledProcessError:
pass
assert cl.stdout == (b'test\n' if reusables.nix_based else b'test\r\n'), cl

outstr = "CompletedProcess(args='echo test', returncode=0, stdout={0}'test{1}\\n')".format('b' if reusables.PY3 else '', '\\r' if reusables.win_based else '')

assert str(cl) == outstr, "{0} != {1}".format(str(cl), outstr)

try:
cl2 = reusables.run('echo test', shell=True, timeout=5)
except NotImplementedError:
if reusables.PY3:
raise AssertionError("Should only happen on PY2")
pass
else:
if reusables.PY2:
raise AssertionError("Timeout should not have worked for PY2")


if reusables.nix_based:
class TestReuseLinux(unittest.TestCase):
def test_safe_bad_path(self):
path = "/var/lib\\/test/p?!ath/fi\0lename.txt"
expected = "/var/lib_/test/p__ath/fi_lename.txt"
resp = reusables.safe_path(path)
assert not [x for x in ("!", "?", "\0", "^", "&", "*") if
x in resp], resp
assert resp == expected, resp

def test_safe_good_path(self):
path = "/var/lib/test/path/filename.txt"
resp = reusables.safe_path(path)
assert resp == path, resp

def test_join_path_clean(self):
resp = reusables.join_paths('/test/', 'clean/', 'path')
assert resp == '/test/clean/path', resp

def test_join_path_dirty(self):
resp = reusables.join_paths('/test/', '/dirty/', ' path.file ')
assert resp == '/test/dirty/path.file', resp

def test_join_path_clean_strict(self):
resp = reusables.join_paths('/test/', 'clean/', 'path/')
assert resp == '/test/clean/path/', resp

def test_join_root(self):
resp = reusables.join_root('clean/')
path = os.path.abspath(os.path.join(".", 'clean/'))
assert resp == path, (resp, path)


if reusables.win_based:
class TestReuseWindows(unittest.TestCase):
Expand All @@ -375,18 +390,7 @@ def test_win_join_path_dirty(self):
resp = reusables.join_paths('C:\\test\\', 'D:\\dirty', ' path.file ')
assert resp == 'D:\\dirty\\path.file', resp

def test_win_join_path_clean_strict(self):
resp = reusables.join_paths('C:\\test', 'clean\\', 'path', strict=True).rstrip("\\")
assert resp == 'C:\\test\\clean\\path', resp

def test_win_join_path_dirty_strict(self):
resp = reusables.join_paths('C:\\test\\', 'D:\\dirty',
' path.file ', strict=True)
assert resp == 'D:\\dirty\\ path.file ', resp

def test_win_join_root(self):
if not reusables.win_based:
self.skipTest("Windows based test")
resp = reusables.join_root('clean\\')
path = os.path.abspath(os.path.join(".", 'clean\\'))
assert resp == path, (resp, path)
Expand Down

0 comments on commit ee3c9f1

Please sign in to comment.