-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improved support for saving typechanges #6793
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solution looks plausible to me, but I don't feel I have a good grasp on GitRepo.save_
. Hence, somewhat relying on parsing and confirming the test with my approval here. :)
it seems that windows fail on appveyor is "legit" as happens in test_save_typechange details================================== FAILURES ===================================
____________________________ test_save_typechange _____________________________
self = <datalad.support.parallel.ProducerConsumerProgressLog object at 0x000002B08009BC10>
jobs = 1
def _iter_threads(self, jobs):
self._interrupted = False
self._producer_finished = False
self._producer_exception = None
self._producer_interrupt = None
# To allow feeding producer queue with more entries, possibly from consumer!
self._producer_queue = producer_queue = Queue()
consumer_queue = Queue()
def producer_worker():
"""That is the one which interrogates producer and updates .total"""
try:
for value in self._producer_iter:
if self._producer_interrupt:
raise InterruptedError("Producer thread was interrupted due to %s" % self._producer_interrupt)
self.add_to_producer_queue(value)
except InterruptedError:
pass # There is some outside exception which will be raised
except BaseException as e:
self._producer_exception = e
finally:
self._producer_finished = True
def consumer_worker(callable, *args, **kwargs):
"""Since jobs could return a generator and we cannot really "inspect" for that
"""
res = callable(*args, **kwargs)
if inspect.isgenerator(res):
lgr.debug("Got consumer worker which returned a generator %s", res)
didgood = False
for r in res:
didgood = True
lgr.debug("Adding %s to queue", r)
consumer_queue.put(r)
if not didgood:
lgr.error("Nothing was obtained from %s :-(", res)
else:
lgr.debug("Got straight result %s, not a generator", res)
consumer_queue.put(res)
self._producer_thread = Thread(target=producer_worker)
self._producer_thread.start()
self._futures = futures = {}
lgr.debug("Initiating ThreadPoolExecutor with %d jobs", jobs)
# we will increase sleep_time when doing nothing useful
sleeper = Sleeper()
interrupted_by_exception = None
with concurrent.futures.ThreadPoolExecutor(jobs) as executor:
self._executor = executor
# yield from the producer_queue (.total and .finished could be accessed meanwhile)
while True:
try:
done_useful = False
if self.reraise_immediately and self._producer_exception and not interrupted_by_exception:
# so we have a chance to exit gracefully
# No point to reraise if there is already an exception which was raised
# which might have even been this one
lgr.debug("Reraising an exception from producer as soon as we found it")
raise self._producer_exception
if (self._producer_finished and
not futures and
consumer_queue.empty() and
producer_queue.empty()):
# This will let us not "escape" the while loop and reraise any possible exception
# within the loop if we have any.
# Otherwise we might see "RuntimeError: generator ignored GeneratorExit"
# when e.g. we did continue upon interrupted_by_exception, and then
# no other subsequent exception was raised and we left the loop
> raise _FinalShutdown()
E datalad.support.parallel._FinalShutdown
..\datalad\support\parallel.py:368: _FinalShutdown
During handling of the above exception, another exception occurred:
path = 'C:\\DLTMP\\datalad_temp_ng40ii_i'
@with_tempfile
def test_save_typechange(path=None):
ckwa = dict(result_renderer='disabled')
ds = Dataset(path).create(**ckwa)
foo = ds.pathobj / 'foo'
# save a file
foo.write_text('some')
ds.save(**ckwa)
# now delete the file and replace with a directory and a file in it
foo.unlink()
foo.mkdir()
bar = foo / 'bar'
bar.write_text('foobar')
res = ds.save(**ckwa)
assert_in_results(res, path=str(bar), action='add', status='ok')
assert_repo_status(ds.repo)
# now replace file with subdataset
# (this is https://github.com/datalad/datalad/issues/5418)
bar.unlink()
Dataset(ds.pathobj / 'tmp').create(**ckwa)
shutil.move(ds.pathobj / 'tmp', bar)
> res = ds.save(**ckwa)
..\datalad\support\tests\test_repo_save.py:104:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
..\datalad\distribution\dataset.py:502: in apply_func
return f(*args, **kwargs)
..\datalad\interface\utils.py:447: in eval_func
return return_func(*args, **kwargs)
..\datalad\interface\utils.py:439: in return_func
results = list(results)
..\datalad\interface\utils.py:357: in generator_func
for r in _process_results(
..\datalad\interface\utils.py:544: in _process_results
for res in results:
..\datalad\core\local\save.py:389: in __call__
yield from ProducerConsumerProgressLog(
..\datalad\support\parallel.py:535: in __iter__
for res in super().__iter__():
..\datalad\support\parallel.py:265: in __iter__
yield from self._iter_threads(self._jobs)
..\datalad\support\parallel.py:417: in _iter_threads
self.shutdown(force=True, exception=self._producer_exception or interrupted_by_exception)
..\datalad\support\parallel.py:233: in shutdown
raise exception
..\datalad\support\parallel.py:401: in _iter_threads
done_useful |= self._pop_done_futures(lgr)
..\datalad\support\parallel.py:463: in _pop_done_futures
raise exception
C:\Python39-x64\lib\concurrent\futures\thread.py:58: in run
result = self.fn(*self.args, **self.kwargs)
..\datalad\support\parallel.py:329: in consumer_worker
for r in res:
..\datalad\core\local\save.py:308: in save_ds
for res in pds_repo.save_(
..\datalad\support\gitrepo.py:3509: in save_
self._save_post(message, chain(*status_state.values()), need_partial_commit, amend=amend,
..\datalad\support\annexrepo.py:3529: in _save_post
self.localsync(managed_only=True)
..\datalad\support\annexrepo.py:3613: in localsync
self.call_annex(cmd)
..\datalad\support\annexrepo.py:1269: in call_annex
return self._call_annex(
..\datalad\support\annexrepo.py:946: in _call_annex
return runner.run(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def run(self,
cmd,
protocol=None,
stdin=None,
cwd=None,
env=None,
timeout=None,
exception_on_error=True,
**kwargs):
"""Execute a command and communicate with it.
Parameters
----------
cmd : list or str
Sequence of program arguments. Passing a single string causes
execution via the platform shell.
protocol : WitlessProtocol, optional
Protocol class handling interaction with the running process
(e.g. output capture). A number of pre-crafted classes are
provided (e.g `KillOutput`, `NoCapture`, `GitProgress`).
If the protocol has the GeneratorMixIn-mixin, the run-method
will return an iterator and can therefore be used in a for-clause.
stdin : file-like, string, bytes, Queue, or None
If stdin is a file-like, it will be directly used as stdin for the
subprocess. The caller is responsible for writing to it and closing it.
If stdin is a string or bytes, those will be fed to stdin of the
subprocess. If all data is written, stdin will be closed.
If stdin is a Queue, all elements (bytes) put into the Queue will
be passed to stdin until None is read from the queue. If None is read,
stdin of the subprocess is closed.
cwd : path-like, optional
If given, commands are executed with this path as PWD,
the PWD of the parent process is used otherwise. Overrides
any `cwd` given to the constructor.
env : dict, optional
Environment to be used for command execution. If `cwd`
was given, 'PWD' in the environment is set to its value.
This must be a complete environment definition, no values
from the current environment will be inherited. Overrides
any `env` given to the constructor.
timeout:
None or the seconds after which a timeout callback is
invoked, if no progress was made in communicating with
the sub-process, or if waiting for the subprocess exit
took more than the specified time. See the protocol and
`ThreadedRunner` descriptions for a more detailed discussion
on timeouts.
exception_on_error : bool, optional
This argument is only interpreted if the protocol is a subclass
of `GeneratorMixIn`. If it is `True` (default), a
`CommandErrorException` is raised by the generator if the
sub process exited with a return code not equal to zero. If the
parameter is `False`, no exception is raised. In both cases the
return code can be read from the attribute `return_code` of
the generator.
kwargs :
Passed to the Protocol class constructor.
Returns
-------
Union[Any, Generator]
If the protocol is not a subclass of `GeneratorMixIn`, the
result of protocol._prepare_result will be returned.
If the protocol is a subclass of `GeneratorMixIn`, a Generator
will be returned. This allows to use this method in constructs like:
for protocol_output in runner.run():
...
Where the iterator yields whatever protocol.pipe_data_received
sends into the generator.
If all output was yielded and the process has terminated, the
generator will raise StopIteration(return_code), where
return_code is the return code of the process. The return code
of the process will also be stored in the "return_code"-attribute
of the runner. So you could write:
gen = runner.run()
for file_descriptor, data in gen:
...
# get the return code of the process
result = gen.return_code
Raises
------
CommandError
On execution failure (non-zero exit code) this exception is
raised which provides the command (cmd), stdout, stderr,
exit code (status), and a message identifying the failed
command, as properties.
FileNotFoundError
When a given executable does not exist.
"""
if protocol is None:
# by default let all subprocess stream pass through
protocol = NoCapture
cwd = cwd or self.cwd
env = self._get_adjusted_env(
env or self.env,
cwd=cwd,
)
lgr.debug('Run %r (cwd=%s)', cmd, cwd)
self.threaded_runner = ThreadedRunner(
cmd=cmd,
protocol_class=protocol,
stdin=stdin,
protocol_kwargs=kwargs,
timeout=timeout,
exception_on_error=exception_on_error,
cwd=cwd,
env=env
)
results_or_iterator = self.threaded_runner.run()
if issubclass(protocol, GeneratorMixIn):
return results_or_iterator
else:
results = results_or_iterator
# log before any exception is raised
lgr.debug("Finished %r with status %s", cmd, results['code'])
# make it such that we always blow if a protocol did not report
# a return code at all
if results.get('code', True) not in [0, None]:
# the runner has a better idea, doc string warns Protocol
# implementations not to return these
results.pop('cmd', None)
results.pop('cwd', None)
> raise CommandError(
# whatever the results were, we carry them forward
cmd=cmd,
cwd=self.cwd,
**results,
)
E datalad.runner.exception.CommandError: CommandError: '"git" "-c" "diff.ignoreSubmodules=none" "annex" "sync" "--no-push" "--no-pull" "--no-commit" "--no-resolvemerge" "--no-content" "-c" "annex.dotfiles=true"' failed with exitcode 1 under C:\DLTMP\datalad_temp_ng40ii_i [err: 'fatal: entry 'bar' object type (blob) doesn't match mode type (commit)
E git-annex: user error (git ["--git-dir=.git","--work-tree=.","--literal-pathspecs","-c","annex.dotfiles=true","mktree","--batch","-z"] exited 128)']
..\datalad\runner\runner.py:201: CommandError
============================== warnings summary ===============================
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs to deal with windows fail one way or another
I don't think there is much I can do about this, other than not running this on windows. I will try to produce a standalone test for this failure, and file a bug for git-annex, if I succeed. |
This changeset brings a tests for verifying that `save()` can handle the following type changes in a dataset: - replace a file with a directory (that has content) - replace a file with a (sub)dataset - replace a directory with a (sub)dataset - replace a subdataset with a file `GitRepo.save_()` was fixed to detect and handle replacing a file with a (sub)dataset. At the moment the (rather complex case of) replacing a directory that contains a registered subdataset with a new subdataset requires two calls to `save()`, instead of being able bring the dataset to a clean state in one go. This first call automatically unregisters the vanished subdataset, and the second call registers the new one. Fixes datalad#5418
Analysis results are not available for those commits View more on Code Climate. |
Changed base to |
Codecov Report
@@ Coverage Diff @@
## maint #6793 +/- ##
==========================================
- Coverage 91.23% 91.21% -0.02%
==========================================
Files 354 354
Lines 46110 46123 +13
==========================================
+ Hits 42070 42073 +3
- Misses 4040 4050 +10
Continue to review full report at Codecov.
|
The metalad failure is unrelated to this PR and observable elsewhere too. The windows issue is minor and git-annex related, filed here: #6857 This is ready to merge. |
This changeset brings a tests for verifying that
save()
can handle thefollowing type changes in a dataset:
GitRepo.save_()
was fixed to detect and handle replacing a filewith a (sub)dataset.
At the moment the (rather complex case of) replacing a directory that
contains a registered subdataset with a new subdataset requires two
calls to
save()
, instead of being able bring the dataset to a cleanstate in one go. This first call automatically unregisters the vanished
subdataset, and the second call registers the new one.
Fixes #5418
Changelog
🐛 Bug Fixes
save
can now handle various type changes in a dataset, such as a file replaced by a directory or a subdataset, a directory replaced by a subdataset, or a subdataset replaced by a file. Fixes save: does not add/commit subds which replaces a symlink #5418