Skip to content

Commit

Permalink
the streams with no_split are not split now
Browse files Browse the repository at this point in the history
  • Loading branch information
Jose Blanca committed Aug 31, 2009
1 parent 8f7e6db commit f4bc17c
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 36 deletions.
87 changes: 51 additions & 36 deletions psubprocess/prunner.py
Expand Up @@ -302,11 +302,15 @@ def to_be_split_first(stream1, stream2):
for index in input_stream_indexes:
stream = streams[index]
#splitter
if 'splitter' not in stream:
splitter = None
if 'special' in stream and 'no_split' in stream['special']:
splitter = _create_non_splitter_splitter(copy_files=True)
elif 'splitter' not in stream:
msg = 'An splitter should be provided for every input stream'
msg += 'missing for: ' + str(stream)
raise ValueError(msg)
splitter = stream['splitter']
else:
splitter = stream['splitter']
#the splitter can be a re, in that case with create the function
if '__call__' not in dir(splitter):
splitter = _create_file_splitter_with_re(splitter)
Expand Down Expand Up @@ -339,6 +343,7 @@ def to_be_split_first(stream1, stream2):
split_files[index] = files #a list of files for every in stream

#we split the ouptut stream files into several splits
output_splitter = _create_non_splitter_splitter(copy_files=False)
for index in output_stream_indexes:
stream = streams[index]
#for th output we just create the new names, but we don't split
Expand All @@ -347,7 +352,7 @@ def to_be_split_first(stream1, stream2):
fname = stream['fhand']
else:
fname = stream['fname']
files = _output_splitter(fname, work_dirs)
files = output_splitter(fname, work_dirs)
split_files[index] = files #a list of files for every in stream

new_streamss = []
Expand Down Expand Up @@ -591,43 +596,53 @@ def splitter(file_, work_dirs):
return new_files
return splitter

def _output_splitter(file_, work_dirs):
'''It creates one output file for every splits.
def _create_non_splitter_splitter(copy_files=False):
'''It creates an splitter function that will not split the given file.
Every split will be located in one of the work_dirs.
It returns a list with the fpaths for the new output files.
The created splitter will create one file for every work_dir given. This
file can be empty (useful for the output streams, or a copy of the given
file (useful for the no_split input streams).
'''
#the file_ can be an fname or an fhand. which one is it?
file_is_str = None
if isinstance(file_, str):
fname = file_
file_is_str = True
else:
fname = file_.name
file_is_str = False
#how many splits do we want?
nsplits = len(work_dirs)

new_fpaths = []
#we have to create nsplits
for split_index in range(nsplits):
suffix = os.path.splitext(fname)[-1]
work_dir = work_dirs[split_index]
#we use delete=False because this temp file is in a temp dir that will
#be completely deleted. If we use delete=True we get an error because
#the file might be already deleted when its __del__ method is called
ofh = NamedTemporaryFile(dir=work_dir.name, suffix=suffix,
delete=False)
#the file will be deleted
#what do we need the fname or the fhand?
if file_is_str:
#it will be deleted because we just need the name in the temporary
#directory. tempfile.mktemp would be better for this use, but it is
#deprecated
new_fpaths.append(ofh.name)
def splitter(file_, work_dirs):
'''It creates one output file for every splits.
Every split will be located in one of the work_dirs.
It returns a list with the fpaths for the new files.
'''
#the file_ can be an fname or an fhand. which one is it?
file_is_str = None
if isinstance(file_, str):
fname = file_
file_is_str = True
else:
new_fpaths.append(ofh)
return new_fpaths
fname = file_.name
file_is_str = False
#how many splits do we want?
nsplits = len(work_dirs)

new_fpaths = []
#we have to create nsplits
suffix = os.path.splitext(fname)[-1]
for split_index in range(nsplits):
work_dir = work_dirs[split_index]
#we use delete=False because this temp file is in a temp dir that
#will be completely deleted. If we use delete=True we get an error
#because the file might be already deleted when its __del__ method
#is called
ofh = NamedTemporaryFile(dir=work_dir.name, suffix=suffix,
delete=False)
if copy_files:
os.remove(ofh.name)
os.symlink(fname, ofh.name)
#the file will be deleted
#what do we need the fname or the fhand?
if file_is_str:
new_fpaths.append(ofh.name)
else:
new_fpaths.append(ofh)
return new_fpaths
return splitter

def default_cat_joiner(out_file_, in_files_):
'''It joins the given in files into the given out file.
Expand Down
28 changes: 28 additions & 0 deletions test/prunner_test.py
Expand Up @@ -235,6 +235,34 @@ def test_kill_subjobs():
assert not open(stderr.name).read()
os.remove(bin)

@staticmethod
def test_nosplit():
'It test that we can set some input files to be not split'
bin = create_test_binary()
#with infile
in_file = NamedTemporaryFile()
content = 'hola1\nhola2\n'
in_file.write(content)
in_file.flush()
out_file = NamedTemporaryFile()

cmd = [bin]
cmd.extend(['-i', in_file.name, '-t', out_file.name])
stdout = NamedTemporaryFile()
stderr = NamedTemporaryFile()
cmd_def = [{'options': ('-i', '--input'), 'io': 'in',
'special':['no_split']},
{'options': ('-t', '--output'), 'io': 'out'}]
splits = 4
popen = Popen(cmd, stdout=stdout, stderr=stderr, cmd_def=cmd_def,
splits=splits)
assert popen.wait() == 0 #waits till finishes and looks to the retcod
assert not open(stdout.name).read()
assert not open(stderr.name).read()
assert open(out_file.name).read() == content * splits
in_file.close()
os.remove(bin)

if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()

0 comments on commit f4bc17c

Please sign in to comment.