diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..8a59208 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,14 @@ +language: python + +python: + - "3.4" + +install: + - pip install -r tests/requirements.txt + - pip install nose + +script: nosetests --with-coverage --cover-package=shcmd + +after_success: + - pip install coveralls + - coveralls diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..5904a86 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include README.rst LICENSE tests/requirements.txt diff --git a/README.md b/README.md deleted file mode 100644 index af6bab7..0000000 --- a/README.md +++ /dev/null @@ -1,38 +0,0 @@ -SHCMD ~~上海崇明岛~~ -==================== - -Note: Work in Progress. - -This lib aims to provide a Human friendly interface for subprocess. - -If you need piped subprocesses, give [envoy](https://github.com/kennethreitz/envoy) a try. - - -[![Build Status][travis-image]][travis-url] -[![Coverage Status][coverage-image]][coverage-url] -[![Requirements Status][req-status-image]][req-status-url] - - -### Usage -```python -import shcmd - -with shcmd.cd("/tmp"): - # get result directly - assert shcmd.run("pwd") == "/tmp" - # get streamed result packed in a generator - streamed = shcmd.run("ls", stream=True) - for filename in streamed.iter_lines(): - print(filename) - # get full stdout/stderr - print(streamed.stdout) - print(streamed.stderr) -``` - - -[travis-url]: https://travis-ci.org/SkyLothar/shcmd -[travis-image]: https://travis-ci.org/SkyLothar/shcmd.svg?branch=master -[coverage-image]: https://coveralls.io/repos/SkyLothar/shcmd/badge.png -[coverage-url]: https://coveralls.io/r/SkyLothar/shcmd -[req-status-url]: https://requires.io/github/SkyLothar/shcmd/requirements/?branch=master -[req-status-image]: https://requires.io/github/SkyLothar/shcmd/requirements.svg?branch=master diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..abb98d8 --- /dev/null +++ b/README.rst @@ -0,0 +1,35 @@ +SHCMD +----- + +Note: Work in Progress. + +This lib aims to provide a Human friendly interface for subprocess. + +If you need piped subprocesses, give envoy_ a try. + +.. image:: https://travis-ci.org/SkyLothar/shcmd.svg?branch=master + :target: https://travis-ci.org/SkyLothar/shcmd +.. image:: https://travis-ci.org/SkyLothar/shcmd.svg?branch=master + :target: https://travis-ci.org/SkyLothar/shcmd +.. image:: https://img.shields.io/coveralls/SkyLothar/shcmd/master.svg + :target: https://coveralls.io/r/SkyLothar/shcmd + +Usage +^^^^^^ + +.. code-block:: python + + import shcmd + + with shcmd.cd("/tmp"): + # get result directly + assert shcmd.run("pwd") == "/tmp" + # get streamed result packed in a generator + streamed = shcmd.run("ls", stream=True) + for filename in streamed.iter_lines(): + print(filename) + # get full stdout/stderr + print(streamed.stdout) + print(streamed.stderr) + +.. _`envoy`: https://github.com/kennethreitz/envoy diff --git a/setup.py b/setup.py index 6e79086..d18ea9f 100644 --- a/setup.py +++ b/setup.py @@ -1,16 +1,16 @@ # -*- coding: utf-8 -*- -__version__ = "" -__author__ = "" -__email__ = "" -__url__ = "" - - import os import sys from codecs import open +__version__ = "0.1.0" +__author__ = "SkyLothar" +__email__ = "allothar@gmail.com" +__url__ = "https://github.com/skylothar/shcmd" + + try: import setuptools except ImportError: @@ -25,7 +25,7 @@ packages = ["shcmd"] -with open("README.md", "r", "utf-8") as f: +with open("README.rst", "r", "utf-8") as f: readme = f.read() with open("tests/requirements.txt", "r", "utf-8") as f: @@ -35,33 +35,31 @@ setuptools.setup( name="shcmd", version=__version__, - description="", + description="simple command-line wrapper", long_description=readme, author=__author__, author_email=__email__, url=__url__, packages=packages, package_data={ - "": ["LICENSE", "NOTICE"] + "": ["LICENSE"] }, package_dir={ "shcmd": "shcmd" }, include_package_data=True, - install_requires=[], license="Apache 2.0", zip_safe=False, - classifiers=( - "Development Status :: 5 - Production/Stable", + classifiers=[ + "Development Status :: 4 - Beta", "Intended Audience :: Developers", "Natural Language :: English", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", - "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.3", "Programming Language :: Python :: 3.4" - - ), + ], setup_requires=["nose >= 1.0"], tests_require=tests_require, test_suite="nose.collector" diff --git a/shcmd/__init__.py b/shcmd/__init__.py index e69de29..b77b66d 100644 --- a/shcmd/__init__.py +++ b/shcmd/__init__.py @@ -0,0 +1,29 @@ +__version__ = "0.1.2" +__author__ = "SkyLothar" +__email__ = "allothar@gmail.com" +__url__ = "https://github.com/skylothar/shcmd" + +import os + +__all__ = ["cd", "cd_to", "run", "tar_generator"] + +from .cmd import cd, cd_to +from .proc import Proc +from .tar import tar_generator +from .utils import expand_args + + +DEFAULT_TIMEOUT = 60 + + +def run(cmd, cwd=None, env=None, timeout=None, stream=False): + proc = Proc( + expand_args(cmd), + os.path.realpath(cwd or os.getcwd()), + env=env or {}, + timeout=timeout or DEFAULT_TIMEOUT + ) + + if not stream: + proc.block() + return proc diff --git a/shcmd/cmd.py b/shcmd/cmd.py index a981d9e..39d1252 100644 --- a/shcmd/cmd.py +++ b/shcmd/cmd.py @@ -1,31 +1,21 @@ # -*- coding: utf8 -*- import contextlib +import functools import os -import shlex -from . import compat +@contextlib.contextmanager +def cd(cd_path): + """cd to target dir when running in this block -def split_args(cmd_args): - """Split command args to args list - Returns a list of args + :param cd_path: dir to cd into - :param cmd_args: command args, can be tuple, list or str - """ - if isinstance(cmd_args, (tuple, list)): - args_list = list(cmd_args) - elif compat.is_py2 and isinstance(cmd_args, compat.str): - args_list = shlex.split(cmd_args.encode("utf8")) - elif compat.is_py3 and isinstance(cmd_args, compat.bytes): - args_list = shlex.split(cmd_args.decode("utf8")) - else: - args_list = shlex.split(cmd_args) - return args_list + Usage:: - -@contextlib.contextmanager -def cd(cd_path): + >>> with cd("/tmp"): + ... print("we are in /tmp now") + """ oricwd = os.getcwd() try: os.chdir(cd_path) @@ -34,23 +24,23 @@ def cd(cd_path): os.chdir(oricwd) -class CmdRequest(object): - def __init__(self, cmd, cwd=None): - self._raw = cmd - self._cmd = split_args(cmd) - self._cwd = os.path.realpath(cwd or os.getcwd()) +def cd_to(path): + """make a generator like cd, but use it for function - def __str__(self): - return "".format(self._raw, self.cwd) + Usage:: - @property - def raw(self): - return self._raw + >>> @cd_to("/") + ... def say_where(): + ... print(os.getcwd()) + ... + >>> say_where() + / - @property - def cmd(self): - return self._cmd[:] - - @property - def cwd(self): - return self._cwd + """ + def cd_to_decorator(func): + @functools.wraps(func) + def _cd_and_exec(*args, **kwargs): + with cd(path): + return func(*args, **kwargs) + return _cd_and_exec + return cd_to_decorator diff --git a/shcmd/compat.py b/shcmd/compat.py deleted file mode 100644 index fcfb9b8..0000000 --- a/shcmd/compat.py +++ /dev/null @@ -1,98 +0,0 @@ -# -*- coding: utf-8 -*- - -""" -pythoncompat -""" - -import sys - -# ------- -# Pythons -# ------- - -# Syntax sugar. -_ver = sys.version_info - -#: Python 2.x? -is_py2 = (_ver[0] == 2) - -#: Python 3.x? -is_py3 = (_ver[0] == 3) - -#: Python 3.0.x -is_py30 = (is_py3 and _ver[1] == 0) - -#: Python 3.1.x -is_py31 = (is_py3 and _ver[1] == 1) - -#: Python 3.2.x -is_py32 = (is_py3 and _ver[1] == 2) - -#: Python 3.3.x -is_py33 = (is_py3 and _ver[1] == 3) - -#: Python 3.4.x -is_py34 = (is_py3 and _ver[1] == 4) - -#: Python 2.7.x -is_py27 = (is_py2 and _ver[1] == 7) - -#: Python 2.6.x -is_py26 = (is_py2 and _ver[1] == 6) - -#: Python 2.5.x -is_py25 = (is_py2 and _ver[1] == 5) - - -# --------- -# Platforms -# --------- - - -# Syntax sugar. -_ver = sys.version.lower() - -is_pypy = ('pypy' in _ver) -is_jython = ('jython' in _ver) -is_ironpython = ('iron' in _ver) - -# Assume CPython, if nothing else. -is_cpython = not any((is_pypy, is_jython, is_ironpython)) - -# Windows-based system. -is_windows = 'win32' in str(sys.platform).lower() - -# Standard Linux 2+ system. -is_linux = ('linux' in str(sys.platform).lower()) -is_osx = ('darwin' in str(sys.platform).lower()) -is_hpux = ('hpux' in str(sys.platform).lower()) # Complete guess. -is_solaris = ('solar==' in str(sys.platform).lower()) # Complete guess. - -try: - import simplejson as json -except (ImportError, SyntaxError): - # simplejson does not support Python 3.2, it thows a SyntaxError - # because of u'...' Unicode literals. - import json - -# --------- -# Specifics -# --------- - -if is_py2: - from StringIO import StringIO - - builtin_str = str - bytes = str - str = unicode - basestring = basestring - numeric_types = (int, long, float) - -elif is_py3: - from io import StringIO - - builtin_str = str - str = str - bytes = bytes - basestring = (str, bytes) - numeric_types = (int, float) diff --git a/shcmd/errors.py b/shcmd/errors.py index b34eebb..31e3d45 100644 --- a/shcmd/errors.py +++ b/shcmd/errors.py @@ -1,2 +1,2 @@ -class CmdError(Exception): +class ShCmdError(Exception): pass diff --git a/shcmd/proc.py b/shcmd/proc.py new file mode 100644 index 0000000..6ef71f9 --- /dev/null +++ b/shcmd/proc.py @@ -0,0 +1,204 @@ +# -*- coding: utf8 -*- + +import contextlib +import io +import logging +import subprocess +import threading +import time + +from .errors import ShCmdError + + +logger = logging.getLogger(__name__) + +LINE_CHUNK_SIZE = 1024 + + +def kill_proc(proc, cmd, started_at): + """kill proc if started + returns True if proc is killed actually + """ + if proc.returncode is None: + proc.kill() + + +class Proc(object): + """ + Simple Wrapper around the built-in subprocess module + + use threading.Timer to add timeout option + easy interface for get streamed output of stdout + """ + codec = "utf8" + + def __init__(self, cmd, cwd, env, timeout): + """ + :param cmd: the command + :param cwd: the command should be running under `cwd` dir + :param env: the environment variable + :param timeout: the command should return in `timeout` seconds + + Usage:: + + >>> p = Proc("ls", "/", timeout=1) + >>> p.block() + >>> p.ok + True + >>> type(p.stdout) + + + """ + self._cmd = cmd + self._cwd = cwd + self._env = env + self._timeout = timeout + self._return_code = self._stdout = self._stderr = None + + @property + def cmd(self): + """the proc's command.""" + return self._cmd[:] + + @property + def cwd(self): + """the proc's execuation dir.""" + return self._cwd + + @property + def env(self): + """the proc's environment setting.""" + return self._env.copy() + + @property + def timeout(self): + """the proc's timeout setting.""" + return self._timeout + + @property + def stdout(self): + """proc's stdout.""" + return self._stdout.decode(self.codec) + + @property + def stderr(self): + """proc's stderr.""" + return self._stderr.decode(self.codec) + + @property + def return_code(self): + """proc's return_code""" + return self._return_code + + @property + def content(self): + """the output gathered in stdout in bytes format""" + return self._stdout + + @property + def ok(self): + """`True` if proc's return_code is 0""" + return self.return_code == 0 + + def raise_for_error(self): + """raise `ShCmdError` if the proc's return_code is not 0""" + if not self.ok: + tip = "running {0} @<{1}> error, return code {2}".format( + " ".join(self.cmd), self.cwd, self.return_code + ) + logger.error("{0}\nstdout:{1}\nstderr:{2}\n".format( + tip, self.stdout, self.stderr + )) + raise ShCmdError(tip) + + @contextlib.contextmanager + def _stream(self): + """execute subprocess with timeout + + Usage:: + + >>> with cmd_proc.run_with_timeout() as cmd_proc: + ... stdout, stderr = cmd_proc.communicate() + ... + >>> assert cmd_proc.proc.return_code == 0, "proc exec failed" + + """ + timer = None + try: + proc = subprocess.Popen( + self.cmd, cwd=self.cwd, env=self.env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + timer = threading.Timer( + self.timeout, + kill_proc, [proc, self.cmd, time.time()] + ) + timer.start() + yield proc + finally: + if timer: + timer.cancel() + + def iter_lines(self): + """yields stdout text, line by line.""" + remain = "" + for data in self.iter_content(LINE_CHUNK_SIZE): + line_break_found = data[-1] in (b"\n", b"\r") + lines = data.decode(self.codec).splitlines() + lines[0] = remain + lines[0] + if not line_break_found: + remain = lines.pop() + for line in lines: + yield line + if remain: + yield remain + + def iter_content(self, chunk_size=1): + """ + yields stdout data, chunk by chunk + + :param chunk_size: size of each chunk (in bytes) + """ + if self.return_code is not None: + stdout = io.BytesIO(self._stdout) + data = stdout.read(chunk_size) + while data: + yield data + data = stdout.read(chunk_size) + else: + data = b'' + started_at = time.time() + with self._stream() as proc: + while proc.poll() is None: + chunk = proc.stdout.read(chunk_size) + if not chunk: + continue + yield chunk + data += chunk + + if proc.returncode == -9: + raise subprocess.TimeoutExpired( + proc.args, time.time() - started_at + ) + + chunk = proc.stdout.read(chunk_size) + while chunk: + yield chunk + data += chunk + chunk = proc.stdout.read(chunk_size) + + self._return_code = proc.returncode + self._stderr = proc.stderr.read() + self._stdout = data + + def block(self): + """blocked executation.""" + if self._return_code is None: + proc = subprocess.Popen( + self.cmd, cwd=self.cwd, env=self.env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + self._stdout, self._stderr = proc.communicate(timeout=self.timeout) + self._return_code = proc.returncode diff --git a/shcmd/result.py b/shcmd/result.py deleted file mode 100644 index e4583f0..0000000 --- a/shcmd/result.py +++ /dev/null @@ -1,42 +0,0 @@ -from . import compat - - -ITER_CHUNK_SIZE = 512 - - -class Result(compat.str): - """ - Simple string subclass to allow arbitrary attribute access. - """ - @property - def stdout(self): - return str(self) - - @property - def stderr(self): - return self._stderr - - def ok(self): - return self.retcode == 0 - - def raise_for_retcode(self): - if not self.ok: - raise ValueError() - - def retcode(self): - return self._retcode - - def iter_lines( - self, - chunk_size=ITER_CHUNK_SIZE, - decode_unicode=None, - delimiter=None - ): - pass - - def iter_content(self, chunk_size=1, decode_unicode=False): - pass - - @property - def reason(self): - pass diff --git a/shcmd/tar.py b/shcmd/tar.py new file mode 100644 index 0000000..38afc6d --- /dev/null +++ b/shcmd/tar.py @@ -0,0 +1,47 @@ +import io +import os +import tarfile + + +def tar_generator(file_list): + """ + generate tar file using giving file list + + :param file_list: a list of file_path or (file_name, file_content) tuple + + Usage:: + + >>> tg = tar_generator([ + ... "/path/to/a", + ... "/path/to/b", + ... ("filename", "content") + ... ]) + >>> len(b"".join(tg)) + 1024 + + """ + tar_buffer = io.BytesIO() + tar_obj = tarfile.TarFile(mode="w", fileobj=tar_buffer, dereference=True) + + for file_detail in file_list: + last = tar_buffer.tell() + if isinstance(file_detail, str): + tar_obj.add(file_detail) + else: + content_name, content = file_detail + content_info = tarfile.TarInfo(content_name) + if isinstance(content, io.BytesIO): + content_info.size = len(content.getvalue()) + else: + content_info.size = len(content) + if isinstance(content, str): + content = content.encode("utf8") + content = io.BytesIO(content) + tar_obj.addfile(content_info, content) + + tar_buffer.seek(last, os.SEEK_SET) + data = tar_buffer.read() + yield data + + tar_obj.close() + yield tar_buffer.read() diff --git a/shcmd/utils.py b/shcmd/utils.py new file mode 100644 index 0000000..428bb2a --- /dev/null +++ b/shcmd/utils.py @@ -0,0 +1,16 @@ +# -*- coding: utf8 -*- + +import shlex + + +def expand_args(cmd_args): + """split command args to args list + returns a list of args + + :param cmd_args: command args, can be tuple, list or str + """ + if isinstance(cmd_args, (tuple, list)): + args_list = list(cmd_args) + else: + args_list = shlex.split(cmd_args) + return args_list diff --git a/tests/test-cmd.py b/tests/test-cmd.py index bbea53c..c2bc337 100644 --- a/tests/test-cmd.py +++ b/tests/test-cmd.py @@ -8,50 +8,17 @@ import shcmd.cmd -def test_asc_split_args(): - correct = ["/bin/bash", "echo", "上海崇明岛"] - - # str - result = shcmd.cmd.split_args("/bin/bash echo 上海崇明岛") - nose.tools.eq_(result, correct, "str test failed") - - # unicode - result = shcmd.cmd.split_args(u"/bin/bash echo 上海崇明岛") - nose.tools.eq_(result, correct, "unicode test failed") - - # bytes - result = shcmd.cmd.split_args(u"/bin/bash echo 上海崇明岛".encode("utf8")) - nose.tools.eq_(result, correct, "bytes test failed") - - # list - result = shcmd.cmd.split_args(["/bin/bash", "echo", "上海崇明岛"]) - nose.tools.eq_(result, correct, "list test failed") - - # tuple - result = shcmd.cmd.split_args(("/bin/bash", "echo", "上海崇明岛")) - nose.tools.eq_(result, correct, "tuple test failed") +TMPDIR = os.path.realpath(tempfile.gettempdir()) def test_cd(): - tmpdir = os.path.realpath(tempfile.gettempdir()) oridir = os.getcwd() - nose.tools.ok_(oridir != tmpdir, "tmp dir == curr dir") - with shcmd.cmd.cd(tmpdir): - nose.tools.eq_( - os.path.realpath(os.getcwd()), - tmpdir, - "not cd to dir" - ) - nose.tools.eq_(os.getcwd(), oridir, "not cd back") - - -def test_request(): - cwd = os.path.realpath("tmp") - cmd_req = shcmd.cmd.CmdRequest("/bin/bash eval 'ls'", "tmp") - nose.tools.eq_(cmd_req.cmd, ["/bin/bash", "eval", "ls"]) - nose.tools.eq_(cmd_req.raw, "/bin/bash eval 'ls'") - nose.tools.eq_(cmd_req.cwd, cwd) - nose.tools.eq_( - str(cmd_req), - "".format(cwd) - ) + nose.tools.ok_(oridir != TMPDIR) + with shcmd.cmd.cd(TMPDIR): + nose.tools.eq_(os.path.realpath(os.getcwd()), TMPDIR) + nose.tools.eq_(os.getcwd(), oridir) + + +@shcmd.cmd.cd_to(TMPDIR) +def test_cd_to(): + nose.tools.eq_(TMPDIR, os.getcwd()) diff --git a/tests/test-run.py b/tests/test-run.py new file mode 100644 index 0000000..6bd2c11 --- /dev/null +++ b/tests/test-run.py @@ -0,0 +1,117 @@ +# -*- coding: utf8 -*- + +import os +import tempfile +import subprocess +import uuid + +import mock + +import shcmd +from shcmd.errors import ShCmdError + + +from nose import tools + + +class TestRun(object): + def setup(self): + self.tmp = os.path.realpath(tempfile.gettempdir()) + self.ls_cmd = "ls {0}".format(self.tmp) + self.ramdom_files = [] + for __ in range(4): + fname = os.path.join(self.tmp, uuid.uuid4().hex) + with open(fname, "wt") as f: + f.write("test-run") + self.ramdom_files.append(fname) + + def teardown(self): + for fname in self.ramdom_files: + os.remove(fname) + + def test_block(self): + # run once + proc = shcmd.run(self.ls_cmd) + ls_result = set(name for name in proc.stdout.splitlines()) + random_files = set( + os.path.basename(name) for name in self.ramdom_files + ) + tools.ok_(random_files.issubset(ls_result)) + tools.eq_(proc.return_code, 0) + proc.raise_for_error() + + # run again + with mock.patch("subprocess.Popen") as mock_p: + proc.block() + tools.eq_(mock_p.mock_calls, []) + + def test_iter_content(self): + # run once + proc = shcmd.run(self.ls_cmd, stream=True) + data = b"".join(d for d in proc.iter_content(100)) + ls_result = set(name for name in data.decode("utf8").splitlines()) + random_files = set( + os.path.basename(name) for name in self.ramdom_files + ) + tools.ok_(random_files.issubset(ls_result)) + tools.eq_(proc.return_code, 0) + proc.raise_for_error() + + # run again + with mock.patch("subprocess.Popen") as mock_p: + for d in proc.iter_content(100): + tools.ok_(len(d) <= 100) + tools.eq_(mock_p.mock_calls, []) + + def test_iter_lines(self): + # run once + proc = shcmd.run(self.ls_cmd, stream=True) + ls_result = set(name for name in proc.iter_lines()) + random_files = set( + os.path.basename(name) for name in self.ramdom_files + ) + print(random_files, ls_result) + tools.ok_(random_files.issubset(ls_result)) + tools.eq_(proc.return_code, 0) + proc.raise_for_error() + + # run again + with mock.patch("subprocess.Popen") as mock_p: + ls_result = set(name for name in proc.iter_lines()) + random_files = set( + os.path.basename(name) + for name in self.ramdom_files + ) + tools.ok_(random_files.issubset(ls_result)) + tools.eq_(mock_p.mock_calls, []) + + @tools.raises(ShCmdError) + @mock.patch("subprocess.Popen.communicate") + def test_error(self, mock_c): + mock_c.return_value = b"stdout", b"stderr" + proc = shcmd.run( + "ls -alh /random-dir/random", + cwd="/", + timeout=1, + env=dict(TEST="1") + ) + tools.eq_(proc.cmd, ["ls", "-alh", "/random-dir/random"]) + tools.eq_(proc.cwd, "/") + tools.eq_(proc.timeout, 1) + tools.eq_(proc.env, {"TEST": "1"}) + + tools.eq_(proc.stdout, "stdout") + tools.eq_(proc.stderr, "stderr") + tools.eq_(proc.content, b"stdout") + + proc.raise_for_error() + + @tools.raises(subprocess.TimeoutExpired) + def test_block_timeout(self): + shcmd.run("grep X", timeout=0.1) + + @tools.raises(subprocess.TimeoutExpired) + def test_stream_timeout(self): + proc = shcmd.run("grep X", timeout=0.1, stream=True) + for data in proc.iter_content(1): + tools.eq_(data, b"") diff --git a/tests/test-tar.py b/tests/test-tar.py new file mode 100644 index 0000000..ccccb16 --- /dev/null +++ b/tests/test-tar.py @@ -0,0 +1,52 @@ +# -*- coding: utf8 -*- + +import io +import os +import tarfile +import tempfile +import uuid + +from nose import tools + +import shcmd.tar + + +class TestRun(object): + def setup(self): + tmp = os.path.realpath(tempfile.gettempdir()) + self.ramdom_files = {} + for __ in range(4): + fname = os.path.join(tmp, uuid.uuid4().hex) + with open(fname, "wb") as f: + content = uuid.uuid4().hex.encode("utf8") + f.write(content) + self.ramdom_files[fname] = content + + def teardown(self): + for fname in self.ramdom_files.keys(): + os.remove(fname) + + def test_tar(self): + extras = [ + ("bytes", b"foo"), + ("str", "bar"), + ("io", io.BytesIO(b"baz")) + ] + file_list = list(self.ramdom_files.keys()) + extras + + tar_data = b"".join(b for b in shcmd.tar.tar_generator(file_list)) + tar_obj = tarfile.open(fileobj=io.BytesIO(tar_data)) + + test_cases = list(self.ramdom_files.items()) + test_cases += [ + ("xbytes", b"foo"), # fake prefix + ("xstr", b"bar"), + ("xio", b"baz") + ] + + for (name, content) in test_cases: + name = name[1:] # tar has not / prefix + info = tar_obj.getmember(name) + e_content = tar_obj.extractfile(info) + tools.eq_(info.name, name) + tools.eq_(e_content.read(), content) diff --git a/tests/test-utils.py b/tests/test-utils.py new file mode 100644 index 0000000..1b01c47 --- /dev/null +++ b/tests/test-utils.py @@ -0,0 +1,21 @@ +# -*- coding: utf8 -*- + +import nose + +import shcmd.utils + + +def test_asc_expand_args(): + correct = ["/bin/bash", "echo", "上海崇明岛"] + + # str + result = shcmd.utils.expand_args("/bin/bash echo 上海崇明岛") + nose.tools.eq_(result, correct) + + # list + result = shcmd.utils.expand_args(["/bin/bash", "echo", "上海崇明岛"]) + nose.tools.eq_(result, correct) + + # tuple + result = shcmd.utils.expand_args(("/bin/bash", "echo", "上海崇明岛")) + nose.tools.eq_(result, correct) diff --git a/tox.ini b/tox.ini index b1adfea..e313e40 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py27, py34, flake8 +envlist = py34, flake8 skipsdist = True [testenv]