Skip to content

Commit 7f3ccfa

Browse files
Delete loose files per pack creation incrementally (#193)
This allows to delete loose objects immediately after each pack is closed and committed to the database, instead of deleting all in a manual cleanup step later. --------- Co-authored-by: Giovanni Pizzi <gio.piz@gmail.com>
1 parent 5b44f09 commit 7f3ccfa

7 files changed

Lines changed: 721 additions & 13 deletions

File tree

disk_objectstore/container.py

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,6 +1261,7 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
12611261
validate_objects: bool = True,
12621262
do_fsync: bool = True,
12631263
callback: None | (Callable[[str, Any], None]) = None,
1264+
clean_loose_per_pack: bool = False,
12641265
) -> None:
12651266
"""Pack all loose objects.
12661267
@@ -1276,10 +1277,11 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
12761277
Note that since loose objects are always uncompressed, `CompressMode.KEEP` is equivalent to
12771278
`CompressMode.NO` in this function.
12781279
:param validate_objects: if True, recompute the hash while packing, and raises if there is a problem.
1279-
:param do_fsync: if True, calls a flush to disk of the pack files before closing it.
1280+
:param do_fsync: if True, calls a flush to disk of each pack file just before closing it.
12801281
Needed to guarantee that data will be there even in the case of a power loss.
1281-
Set this to False if you don't need such a guarantee (major risk of data loss if power
1282-
supply stops during packing operation-we don't recommend this).
1282+
Set to False if you don't need such a guarantee (major risk of data loss if this is set to False, and power
1283+
supply stops during packing operation). If you set `do_fsync=False` at least use
1284+
`clean_loose_per_pack=False`, and do a manual `fsync` before cleaning the loose objects via `clean_storage`.
12831285
:param callback: a callback function that can be used to report progress.
12841286
The callback function should accept two arguments: a string with the action being performed
12851287
and the value of the action. The action can be "init" (initialization),
@@ -1289,6 +1291,8 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
12891291
In case of "update", the value is amount of the operation that has been completed.
12901292
In case of "close", the value is None.
12911293
return value of the callback function is ignored.
1294+
:param clean_loose_per_pack: if True, the loose files that went into a `pack` are deleted immediately after this
1295+
`pack` is created.
12921296
"""
12931297
hash_type = self.hash_type if validate_objects else None
12941298

@@ -1336,9 +1340,6 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
13361340
# Here, I could do some clean up of loose objects that are already in the packs,
13371341
# by removing all loose objects with hash key `existing_packed_hashkeys`, that are
13381342
# already packed.
1339-
# HOWEVER, while this would work fine on Linux, there are concurrency issues both
1340-
# on Mac and on Windows (see issues #37 and #43). Therefore, I do NOT delete them,
1341-
# and deletion is deferred to a manual clean-up operation.
13421343

13431344
if callback:
13441345
callback(
@@ -1357,6 +1358,9 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
13571358
while loose_objects:
13581359
# Store the last pack integer ID, needed to know later if I need to open a new pack
13591360
last_pack_int_id = pack_int_id
1361+
# Track which objects were added to this pack for cleanup
1362+
packed_in_current_pack: list[str] = []
1363+
13601364
# Avoid concurrent writes on the pack file
13611365
with self.lock_pack(str(pack_int_id)) as pack_handle:
13621366
# Inner loop: continue until when there is a file, or
@@ -1431,6 +1435,9 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
14311435
# Appending for later bulk commit - see comments in add_streamed_objects_to_pack
14321436
obj_dicts.append(obj_dict)
14331437

1438+
# Track this object for potential cleanup
1439+
packed_in_current_pack.append(loose_hashkey)
1440+
14341441
if callback:
14351442
callback(
14361443
'update',
@@ -1459,6 +1466,8 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
14591466
)
14601467

14611468
# OK, if we are here, file was flushed, synced to disk and closed.
1469+
# Note that this is only the case if do_fsync is True. This is the default,
1470+
# and we warn users that they should not set it to False.
14621471
# Let's commit then the information to the DB, so it's officially a
14631472
# packed object. Note: committing as soon as we are done with one pack,
14641473
# so if there's a problem with one pack we don't start operating on the next one
@@ -1469,12 +1478,31 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
14691478
# Then, it would be safe to already do some clean up of loose objects that are now packed,
14701479
# and by doing it here we would do it after each pack.
14711480
# This would mean keeping track of the loose objects added to packs, and removing them.
1472-
# HOWEVER, while this would work fine on Linux, there are concurrency issues both
1473-
# on Mac and on Windows (see issues #37 and #43). Therefore, I do NOT delete them,
1474-
# and deletion is deferred to a manual clean-up operation.
1481+
1482+
# Clean up loose objects for this pack if requested
1483+
if clean_loose_per_pack and packed_in_current_pack:
1484+
self._clean_loose_objects(packed_in_current_pack)
14751485
if callback:
14761486
callback('close', None)
14771487

1488+
def _clean_loose_objects(self, hashkeys: list[str]) -> None:
1489+
"""Clean up specific loose objects that have been successfully packed.
1490+
1491+
:param hashkeys: List of hashkeys to clean up from loose objects.
1492+
These should be objects that were just packed.
1493+
"""
1494+
1495+
# Simply remove the loose files for the given hashkeys
1496+
# We trust that the caller has already ensured these are packed
1497+
for obj_hashkey in hashkeys:
1498+
try:
1499+
os.remove(self._get_loose_path_from_hashkey(obj_hashkey))
1500+
except (FileNotFoundError, PermissionError):
1501+
# FileNotFoundError: file might already be removed by another process
1502+
# PermissionError: file might be locked (especially on Windows)
1503+
# In both cases, we just continue - the file will be cleaned up later
1504+
pass
1505+
14781506
def add_streamed_object_to_pack( # pylint: disable=too-many-arguments
14791507
self,
14801508
stream: StreamSeekBytesType,

disk_objectstore/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1302,7 +1302,7 @@ def safe_flush_to_disk(
13021302
real_path: Path,
13031303
use_fullsync: bool = False,
13041304
) -> None:
1305-
"""Tries to to its best to safely commit to disk.
1305+
"""Tries to do its best to safely commit to disk.
13061306
13071307
Note that calling this is needed to reduce the risk of data loss due to, e.g., a power failure.
13081308
However, this call is typically expensive so should be called only if the guarantees are really needed, and

tests/concurrent_tests/periodic_packer.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,13 @@ def timestamp():
2626
)
2727
@click.option('-r', '--repetitions', default=10, help='Number of repetitions before stopping.')
2828
@click.option('-w', '--wait-time', default=0.83, help='Time to wait between iterations.')
29+
@click.option(
30+
'--clean-loose-per-pack/--no-clean-loose-per-pack',
31+
default=False,
32+
help='If set, clean loose objects after every pack operation.',
33+
)
2934
@click.help_option('-h', '--help')
30-
def main(path, repetitions, wait_time):
35+
def main(path, repetitions, wait_time, clean_loose_per_pack):
3136
"""Periodically pack container."""
3237
container = Container(path)
3338
if not container.is_initialised:
@@ -39,7 +44,10 @@ def main(path, repetitions, wait_time):
3944

4045
compress_packs = iteration % 2 == 0
4146
start_counts = container.count_objects()
42-
container.pack_all_loose(compress=compress_packs)
47+
container.pack_all_loose(
48+
compress=compress_packs,
49+
clean_loose_per_pack=clean_loose_per_pack,
50+
)
4351
print(f'[PACKER {timestamp()}] Done packing!')
4452
end_counts = container.count_objects()
4553

tests/test_basic.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,88 @@ def test_exclusive_mode_windows(temp_dir, lock_file_on_windows):
355355
os.close(fd)
356356

357357

358+
def test_clean_loose_objects_empty(temp_container):
359+
"""Test early return."""
360+
result = temp_container._clean_loose_objects([])
361+
assert result is None
362+
363+
364+
def test_clean_loose_objects_file_not_found(temp_container):
365+
"""Test _clean_loose_objects when loose file doesn't exist."""
366+
# Create a fake hashkey that won't have a corresponding loose file
367+
fake_hashkey = 'a' * 64 # Valid hex string of typical hash length
368+
369+
# This should hit the FileNotFoundError exception, which, however, is being captured, so no raise
370+
temp_container._clean_loose_objects([fake_hashkey])
371+
372+
373+
def test_clean_loose_objects_successful_cleanup(temp_container):
374+
"""Test _clean_loose_objects successfully removes loose files."""
375+
contents = [b'content1', b'content2', b'content3']
376+
hashkeys = []
377+
loose_paths = []
378+
379+
for content in contents:
380+
hashkey = temp_container.add_object(content)
381+
hashkeys.append(hashkey)
382+
loose_paths.append(temp_container._get_loose_path_from_hashkey(hashkey))
383+
384+
# Verify all loose files exist
385+
assert all(path.exists() for path in loose_paths)
386+
387+
temp_container._clean_loose_objects(hashkeys)
388+
389+
# Verify all loose files are removed
390+
assert all(not path.exists() for path in loose_paths)
391+
392+
393+
def test_clean_loose_objects_mixed_scenarios(temp_container):
394+
"""Test _clean_loose_objects with mix of existing and non-existing files."""
395+
# Create one real loose object
396+
content = b'real content'
397+
real_hashkey = temp_container.add_object(content)
398+
real_path = temp_container._get_loose_path_from_hashkey(real_hashkey)
399+
assert real_path.exists()
400+
401+
# Create fake hashkeys
402+
fake_hashkey1 = 'b' * 64
403+
fake_hashkey2 = 'c' * 64
404+
405+
# Mix real and fake hashkeys
406+
mixed_hashkeys = [fake_hashkey1, real_hashkey, fake_hashkey2]
407+
408+
# This should handle both successful deletion and FileNotFoundError
409+
temp_container._clean_loose_objects(mixed_hashkeys)
410+
411+
# Real file should be deleted, fake ones just ignored
412+
assert not real_path.exists()
413+
414+
415+
@pytest.mark.skipif(os.name != 'nt', reason='PermissionError test only relevant on Windows')
416+
def test_clean_loose_objects_permission_error_windows(temp_container, lock_file_on_windows):
417+
"""Test _clean_loose_objects when file is locked (Windows only)."""
418+
# Create a loose object
419+
content = b'test content'
420+
hashkey = temp_container.add_object(content)
421+
422+
# Verify the loose file exists
423+
loose_path = temp_container._get_loose_path_from_hashkey(hashkey)
424+
assert loose_path.exists()
425+
426+
# Lock the file on Windows
427+
fd = os.open(loose_path, os.O_RDONLY)
428+
try:
429+
lock_file_on_windows(fd)
430+
431+
# This should hit the PermissionError exception path
432+
temp_container._clean_loose_objects([hashkey])
433+
434+
# File should still exist since it was locked
435+
assert loose_path.exists()
436+
finally:
437+
os.close(fd)
438+
439+
358440
def test_get_pack_id_to_write_to_with_known_sizes(temp_container):
359441
"""Unit test: Verify _get_pack_id_to_write_to correctly uses known_sizes parameter.
360442

0 commit comments

Comments
 (0)