Skip to content

Commit

Permalink
Overhauled pshell.concatenate
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Oct 8, 2018
1 parent 12ec395 commit ae5106b
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 100 deletions.
6 changes: 4 additions & 2 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@ Fork from Legal & General's landg.bash.

API changes:

- Merged ``gzip.open`` into :func:`~pyshell.open`.
Added support for bzip2 and lzma compression.
- Merged ``gzip.open`` into :func:`~pshell.open`.
Added support for bzip2 and lzma compression.
- Changed parameters of :func:`~pshell.concatente`.
By default, the output file is deleted if it already exists.
95 changes: 68 additions & 27 deletions pshell/manipulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,80 @@
__all__ = ('concatenate', )


def concatenate(output_fname, *input_fnames):
def concatenate(input_fnames, output_fname, mode='w', **kwargs):
"""Concatenate files. Python equivalent of
:command:`cat input_fnames[0] input_fnames[1] ... >> output_fname`.
:command:`cat input_fnames[0] input_fnames[1] ... > output_fname`.
:param input_fnames:
sequence of str. Paths to one or more input text files, to be appended
one after the other to the output.
:param str output_fname:
Path to output text file, which may or may not already exist.
If it already exists, the new contents will be appended to it.
:param str input_fnames:
Path to one or more input text files, to be appended one after the
other to the output
:param str mode:
Mode for opening the output file e.g. 'w' or 'ab'.
Defaults to text mode unless 'b' is explicitly declared.
:param kwargs:
Passed verbatim to all the underlying :func:`pshell.open` calls.
Among other things, this means that this function can transparently
deal with compressed files by inspecting their extension; different
files can use different compression algorithms as long as you use
``compression='auto'`` (the default).
If the output is opened in text mode, the inputs will be too; if any file
does not terminate with ``\n``, it will be added. If the output is opened
in binary mode, the inputs will too; no extra bytes will be added between
files.
"""
logging.info("Appending files: %s to: %s", input_fnames, output_fname)

# Check if the last line of the first file ends with a \n
try:
with pshell_open(output_fname, 'rb') as fh:
# Read last character
fh.seek(-1, 2)
prepend_newline = fh.read() != b'\n'
except FileNotFoundError as e:
logging.info("%s", e)
prepend_newline = False
except OSError:
# Empty file
logging.info("Empty file")
prepend_newline = False

with pshell_open(output_fname, 'a') as result:
if 'b' in mode:
_concatenate_binary(input_fnames, output_fname, mode, **kwargs)
else:
_concatenate_text(input_fnames, output_fname, mode, **kwargs)


def _concatenate_binary(input_fnames, output_fname, mode, **kwargs):
"""Implementation of concatenate for binary files
"""
with pshell_open(output_fname, mode, **kwargs) as ofh:
for fname in input_fnames:
with pshell_open(fname, 'rb', **kwargs) as ifh:
for chunk in iter(lambda: ifh.read(65536), b''):
ofh.write(chunk)


def _concatenate_text(input_fnames, output_fname, mode, **kwargs):
"""Implementation of concatenate for text files
"""
prepend_newline = False
if 'a' in mode:
# Check if the last line of the first file ends with a \n
try:
# Discard from kwargs all parameters that are only applicable
# to text mode
kwargs_peek = kwargs.copy()
kwargs_peek.pop('newline', None)
kwargs_peek.pop('encoding', None)
kwargs_peek.pop('errors', None)

with pshell_open(output_fname, 'rb', **kwargs_peek) as fh:
# Read last character
fh.seek(-1, 2)
# Won't work with \r terminator, which nobody cares about
# anyway. We really only care about \n (Unix and MacOSX)
# and \r\n (Windows).
prepend_newline = fh.read() != b'\n'
except FileNotFoundError as e:
logging.info("%s", e)
except OSError:
# Empty file
logging.info("Empty file: %s", output_fname)

with pshell_open(output_fname, mode, **kwargs) as ofh:
if prepend_newline:
result.write('\n')
ofh.write('\n')
for fname in input_fnames:
with open(fname) as ifile:
for line in ifile:
if not line.endswith('\n'):
line = line + '\n'
result.write(line)
with pshell_open(fname, 'r', **kwargs) as ifh:
for line in ifh:
ofh.write(line.rstrip('\r\n'))
ofh.write('\n')
169 changes: 99 additions & 70 deletions pshell/tests/test_manipulate.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,111 @@
import pytest
import pshell as sh


BLURB = 'helloworld'


def test_concatenate1(tmpdir):
@pytest.mark.parametrize('newline', ['\n', '\r\n'])
def test_concatenate_t1(tmpdir, newline):
# Output file already exists and is non-empty. Files end without a newline.
filenames = [
'%s/%d.txt' % (tmpdir, pos)
for pos, _ in enumerate(BLURB)
]

for fname, char in zip(filenames, BLURB):
with open(fname, 'w') as fh:
fh.write(char)

sh.concatenate(*filenames)

with open(filenames[0]) as fh:
concatenated_file_contents = fh.readlines()

assert concatenated_file_contents == [char + '\n' for char in BLURB]


def test_concatenate2(tmpdir):
# Test compression.
out = '%s/out.gz' % tmpdir
in1 = '%s/in1' % tmpdir
in2 = '%s/in2.bz2' % tmpdir

with sh.open(out, 'w') as fh:
fh.write('1')
with sh.open(in1, 'w') as fh:
fh.write('2\n3')
with sh.open(in2, 'w') as fh:
fh.write('4')

n = newline.encode('utf-8')
sh.concatenate([in1, in2], out, 'a', newline=newline)
with sh.open(out, 'rb') as fh:
assert fh.read() == b'1' + n + b'2' + n + b'3' + n + b'4' + n
# Defaults to mode='w'
sh.concatenate([in1, in2], out, newline=newline)
with sh.open(out, 'rb') as fh:
assert fh.read() == b'2' + n + b'3' + n + b'4' + n


@pytest.mark.parametrize('newline', ['\n', '\r\n'])
def test_concatenate_t2(tmpdir, newline):
# Output file already exists and is non-empty. Files end with a newline.
filenames = [
'%s/%d.txt' % (tmpdir, pos)
for pos, _ in enumerate(BLURB)
]

for fname, char in zip(filenames, BLURB):
with open(fname, 'w') as fh:
fh.write(char + '\n')

sh.concatenate(*filenames)

with open(filenames[0]) as fh:
concatenated_file_contents = fh.readlines()

assert concatenated_file_contents == [char + '\n' for char in BLURB]


def test_concatenate3(tmpdir):
out = '%s/out' % tmpdir
in1 = '%s/in1' % tmpdir
in2 = '%s/in2' % tmpdir

with sh.open(out, 'w', newline=newline) as fh:
fh.write('1\n')
with sh.open(in1, 'w', newline=newline) as fh:
fh.write('2\n3\n')
with sh.open(in2, 'w', newline=newline) as fh:
fh.write('4\n')

n = newline.encode('utf-8')
sh.concatenate([in1, in2], out, 'a', newline=newline)
with sh.open(out, 'rb') as fh:
assert fh.read() == b'1' + n + b'2' + n + b'3' + n + b'4' + n
sh.concatenate([in1, in2], out, newline=newline)
with sh.open(out, 'rb') as fh:
assert fh.read() == b'2' + n + b'3' + n + b'4' + n


def test_concatenate_t3(tmpdir):
# Output file already exists and it is empty
filenames = [
'%s/%d.txt' % (tmpdir, pos)
for pos, _ in enumerate(BLURB)
]
out = '%s/out' % tmpdir
in1 = '%s/in1' % tmpdir
in2 = '%s/in2' % tmpdir

with open(filenames[0], 'w') as fh:
with sh.open(out, 'w') as fh:
pass
for fname, char in zip(filenames[1:], BLURB[1:]):
with open(fname, 'w') as fh:
fh.write(char)

sh.concatenate(*filenames)
with sh.open(in1, 'w') as fh:
fh.write('2\n')
with sh.open(in2, 'w') as fh:
fh.write('3\n')

with open(filenames[0]) as fh:
concatenated_file_contents = fh.readlines()
sh.concatenate([in1, in2], out, 'a')
with sh.open(out) as fh:
assert fh.read() == '2\n3\n'
sh.concatenate([in1, in2], out)
with sh.open(out) as fh:
assert fh.read() == '2\n3\n'

assert concatenated_file_contents == [char + '\n' for char in BLURB[1:]]


def test_concatenate4(tmpdir):
def test_concatenate_t4(tmpdir):
# Output file does not already exist
filenames = [
'%s/%d.txt' % (tmpdir, pos)
for pos, _ in enumerate(BLURB)
]

for fname, char in zip(filenames[1:], BLURB[1:]):
with open(fname, 'w') as fh:
fh.write(char)

sh.concatenate(*filenames)

with open(filenames[0]) as fh:
concatenated_file_contents = fh.readlines()

assert concatenated_file_contents == [char + '\n' for char in BLURB[1:]]
out = '%s/out' % tmpdir
in1 = '%s/in1' % tmpdir
in2 = '%s/in2' % tmpdir

with sh.open(in1, 'w') as fh:
fh.write('2')
with sh.open(in2, 'w') as fh:
fh.write('3')

sh.concatenate([in1, in2], out, 'a')
with sh.open(out) as fh:
assert fh.read() == '2\n3\n'
sh.concatenate([in1, in2], out)
with sh.open(out) as fh:
assert fh.read() == '2\n3\n'


def test_concatenate_b(tmpdir):
# Binary mode
out = '%s/out' % tmpdir
in1 = '%s/in1' % tmpdir
in2 = '%s/in2' % tmpdir

with sh.open(out, 'wb') as fh:
fh.write(b'1')
with sh.open(in1, 'wb') as fh:
fh.write(b'2')
with sh.open(in2, 'wb') as fh:
fh.write(b'3')

sh.concatenate([in1, in2], out, 'ab')
with sh.open(out, 'rb') as fh:
assert fh.read() == b'123'
sh.concatenate([in1, in2], out, 'wb')
with sh.open(out, 'rb') as fh:
assert fh.read() == b'23'
2 changes: 1 addition & 1 deletion pshell/tests/test_open.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_open_exclusive_success(tmpdir, openfunc, ext, compression):
with sh.open(fname, 'x', compression=compression) as fh:
fh.write("Hello world")
with openfunc(fname, 'rt') as fh:
assert fh.read() == "Hello world" \
assert fh.read() == "Hello world"


@compression_param
Expand Down

0 comments on commit ae5106b

Please sign in to comment.