Skip to content

Commit

Permalink
add TarGenerator, bump version to 0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
SkyLothar committed Apr 10, 2015
1 parent c5e281e commit bc12490
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 50 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from codecs import open

__version__ = "0.1.2"
__version__ = "0.2.0"
__author__ = "SkyLothar"
__email__ = "allothar@gmail.com"
__url__ = "https://github.com/skylothar/shcmd"
Expand Down
6 changes: 3 additions & 3 deletions shcmd/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
__version__ = "0.1.2"
__version__ = "0.2.0"
__author__ = "SkyLothar"
__email__ = "allothar@gmail.com"
__url__ = "https://github.com/skylothar/shcmd"

import os

__all__ = ["cd", "cd_to", "run", "tar_generator"]
__all__ = ["cd", "cd_to", "run", "TarGenerator"]

from .cmd import cd, cd_to
from .proc import Proc
from .tar import tar_generator
from .tar import TarGenerator
from .utils import expand_args


Expand Down
140 changes: 106 additions & 34 deletions shcmd/tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,117 @@
import tarfile


def tar_generator(file_list):
class TarGenerator(object):
"""
generate tar file using giving file list
:param file_list: a list of file_path or (file_name, file_content) tuple
generate or appending some file to tar file
Usage::
>>> tg = tar_generator([
... "/path/to/a",
... "/path/to/b",
... ("filename", "content")
... ])
>>> len(b"".join(tg))
1024
>>> tg = TarGenerator(old_tar_file)
>>> tg.files_to_add = ["/path/to/file0", "/path/to/file1"]
>>> tg.add_fileobj("obj0", "")
>>> tarfile = b"".join(data for data in tg.generate())
>>> assert tarfile == tg.tar
"""
tar_buffer = io.BytesIO()
tar_obj = tarfile.TarFile(mode="w", fileobj=tar_buffer, dereference=True)

for file_detail in file_list:
last = tar_buffer.tell()
if isinstance(file_detail, str):
tar_obj.add(file_detail)
def __init__(self, origin=None):
"""
:param origin: tar file to appending (default None, generate new tar)
"""
self._tar_buffer = io.BytesIO()

self._tar_obj = tarfile.TarFile(
mode="w",
fileobj=self._tar_buffer,
dereference=True
)

self._generated = False
self._files_to_add = set()
self._ios_to_add = dict()

if origin:
if isinstance(origin, bytes):
origin = io.BytesIO(origin)
origin_tar = tarfile.TarFile(fileobj=origin)
for info in origin_tar.getmembers():
self._tar_obj.addfile(info, origin_tar.extractfile(info))

@property
def files(self):
"""files that will be add to tar file later
should be tuple, list or generator that returns strings
"""
ios_names = [info.name for info in self._ios_to_add.keys()]
return set(self.files_to_add + ios_names)

@property
def files_to_add(self):
return list(self._files_to_add)

@files_to_add.setter
def files_to_add(self, new_fs_list):
self._files_to_add = new_fs_list

def add_fileobj(self, fname, fcontent):
"""add file like object, it will be add to tar file later
:param fname: name in tar file
:param fcontent: content. bytes, string, BytesIO or StringIO
"""
tar_info = tarfile.TarInfo(fname)
if isinstance(fcontent, io.BytesIO):
tar_info.size = len(fcontent.getvalue())
else:
content_name, content = file_detail
content_info = tarfile.TarInfo(content_name)
if isinstance(content, io.BytesIO):
content_info.size = len(content.getvalue())
else:
content_info.size = len(content)
if isinstance(content, str):
content = content.encode("utf8")
content = io.BytesIO(content)
tar_obj.addfile(content_info, content)

tar_buffer.seek(last, os.SEEK_SET)
data = tar_buffer.read()
yield data

tar_obj.close()
yield tar_buffer.read()
tar_info.size = len(fcontent)
if isinstance(fcontent, str):
fcontent = fcontent.encode("utf8")
fcontent = io.BytesIO(fcontent)
self._ios_to_add[tar_info] = fcontent

def generate(self):
"""generate tar file
..Usage::
>>> tarfile = b"".join(data for data in tg.generate())
"""
if self._tar_buffer.tell():
self._tar_buffer.seek(0, 0)
yield self._tar_buffer.read()

for fname in self._files_to_add:
last = self._tar_buffer.tell()
self._tar_obj.add(fname)
self._tar_buffer.seek(last, os.SEEK_SET)
data = self._tar_buffer.read()
yield data

for info, content in self._ios_to_add.items():
last = self._tar_buffer.tell()
self._tar_obj.addfile(info, content)
self._tar_buffer.seek(last, os.SEEK_SET)
data = self._tar_buffer.read()
yield data

self._tar_obj.close()
yield self._tar_buffer.read()
self._generated = True

@property
def generated(self):
return self._generated

@property
def tar(self):
"""tar in bytes format"""
if not self.generated:
for data in self.generate():
pass
return self._tar_buffer.getvalue()

@property
def tar_io(self):
"""tar in file like object"""
return io.BytesIO(self.tar)
2 changes: 2 additions & 0 deletions tests/test-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def test_block(self):
proc.block()
tools.eq_(mock_p.mock_calls, [])

tools.eq_(proc.content.decode("utf8"), proc.stdout)

def test_iter_content(self):
# run once
proc = shcmd.run(self.ls_cmd, stream=True)
Expand Down
50 changes: 38 additions & 12 deletions tests/test-tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tempfile
import uuid

import mock
from nose import tools

import shcmd.tar
Expand All @@ -22,30 +23,55 @@ def setup(self):
f.write(content)
self.ramdom_files[fname] = content

old_tar_buffer = io.BytesIO()
old_tar_content = uuid.uuid4().hex.encode("utf8")
old_tar = tarfile.TarFile(mode="w", fileobj=old_tar_buffer)
tar_info = tarfile.TarInfo("old")
tar_info.size = len(old_tar_content)
old_tar.addfile(tar_info, io.BytesIO(old_tar_content))
old_tar.close()

self.old_tar_content = old_tar_content
self.old_tar = old_tar_buffer.getvalue()

def teardown(self):
for fname in self.ramdom_files.keys():
os.remove(fname)

def test_tar(self):
extras = [
("bytes", b"foo"),
("str", "bar"),
("io", io.BytesIO(b"baz"))
]
file_list = list(self.ramdom_files.keys()) + extras
tg = shcmd.tar.TarGenerator(self.old_tar)
tg.add_fileobj("bytes", b"foo")
tg.add_fileobj("str", "bar")
tg.add_fileobj("io", io.BytesIO(b"baz"))
tg.files_to_add = self.ramdom_files.keys()

tar_data = b"".join(b for b in shcmd.tar.tar_generator(file_list))
tar_obj = tarfile.open(fileobj=io.BytesIO(tar_data))
tar_data = b"".join(b for b in tg.generate())

test_cases = list(self.ramdom_files.items())
test_cases += [
("xbytes", b"foo"), # fake prefix
("xstr", b"bar"),
("xio", b"baz")
("bytes", b"foo"),
("str", b"bar"),
("io", b"baz"),
("old", self.old_tar_content)
]
self._check_content(tar_data, test_cases)

def test_property(self):
tg = shcmd.tar.TarGenerator()
tg.files_to_add = list(self.ramdom_files.keys())

self._check_content(tg.tar, self.ramdom_files.items())

with mock.patch.object(tg, "generate") as mock_g:
tools.eq_(tg.tar, tg.tar_io.read())

tools.eq_(mock_g.mock_calls, [])

def _check_content(self, tar_data, test_cases):
tar_obj = tarfile.open(fileobj=io.BytesIO(tar_data))
for (name, content) in test_cases:
name = name[1:] # tar has not / prefix
if name.startswith("/"):
name = name[1:]
info = tar_obj.getmember(name)
e_content = tar_obj.extractfile(info)
tools.eq_(info.name, name)
Expand Down

0 comments on commit bc12490

Please sign in to comment.