Skip to content

Commit

Permalink
[feat] Add methods for sequence and container delete (#2946)
Browse files Browse the repository at this point in the history
  • Loading branch information
mihran113 committed Aug 4, 2023
1 parent d68bd02 commit a4e6bc1
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 17 deletions.
25 changes: 25 additions & 0 deletions src/python/aim/_sdk/collections.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from abc import abstractmethod
import logging
from collections import defaultdict
from typing import Iterator, Dict, Tuple, Type

from aim._sdk import type_utils
Expand All @@ -18,6 +20,8 @@
from aim._sdk.sequence import Sequence
from aim._core.storage.treeview import TreeView

logger = logging.getLogger(__name__)


class ContainerCollectionBase(ABCContainerCollection[ContainerType]):
def __init__(self, query_context: Dict):
Expand All @@ -38,6 +42,11 @@ def count(self) -> int:
# more optimal implementation
return sum(1 for _ in self.__iter_meta__())

def delete(self):
repo = self.query_context['repo']
for hash_ in self.__iter_meta__():
repo.delete_container(hash_)


class ContainerLimitCollection(ContainerCollectionBase['Container']):
def __init__(self, base_collection: ContainerCollectionBase['Container'], n: int, query_context):
Expand Down Expand Up @@ -131,6 +140,22 @@ def count(self):
# more optimal implementation
return sum(1 for _ in self.__iter_meta__())

def delete(self):
from aim._sdk import Container
from aim._sdk.lock_manager import RunLockingError
repo = self.query_context['repo']
container_sequence_map = defaultdict(list)
for hash_, name, context in self.__iter_meta__():
container_sequence_map[hash_].append((name, context))

for hash_ in container_sequence_map.keys():
try:
container = Container(hash_, repo=repo, mode='WRITE')
for name, context in container_sequence_map[hash_]:
container.delete_sequence(name, context)
except RunLockingError:
logger.warning(f'Cannot delete sequences for container: {hash_}. Container is locked.')


class SequenceLimitCollection(SequenceCollectionBase['Sequence']):
def __init__(self, base_collection: SequenceCollectionBase['Sequence'], n: int, query_context: Dict):
Expand Down
36 changes: 35 additions & 1 deletion src/python/aim/_sdk/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def _close(self) -> None:

self._state['cleanup'] = True
self._wait_for_empty_queue()
self._set_end_time()
if not self._state.get('deleted'):
self._set_end_time()
if self._status_reporter is not None:
self._status_reporter.close()
if self._lock:
Expand Down Expand Up @@ -215,6 +216,39 @@ def _check(self, query, query_cache, *, aliases=()) -> bool:
query_params = {p: proxy for p in alias_names}
return query.check(**query_params)

def delete_sequence(self, name, context=None):
if self._is_readonly:
raise RuntimeError('Cannot delete sequence in read-only mode.')

context = {} if context is None else context
sequence = self._sequence_map._sequence(name, context)
sequence.delete()

def delete(self):
if self._is_readonly:
raise RuntimeError('Cannot delete container in read-only mode.')

# remove container meta tree
meta_tree = self.storage.tree(self.hash, 'meta', read_only=False)
del meta_tree.subtree('chunks')[self.hash]
# remove container sequence tree
seq_tree = self.storage.tree(self.hash, 'seqs', read_only=False)
del seq_tree.subtree('chunks')[self.hash]

# remove container blobs trees
blobs_tree = self.storage.tree(self.hash, 'BLOBS', read_only=False)
del blobs_tree.subtree(('meta', 'chunks'))[self.hash]
del blobs_tree.subtree(('seqs', 'chunks'))[self.hash]

# delete entry from container map
del meta_tree.subtree('cont_types_map')[self.hash]

# set a deleted flag
self._state['deleted'] = True

# close the container
self.close()

@property
def sequences(self) -> 'ContainerSequenceMap':
return self._sequence_map
Expand Down
22 changes: 7 additions & 15 deletions src/python/aim/_sdk/lock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,26 +138,18 @@ def release_locks(self, run_hash: str, force: bool) -> bool:
success = True
lock_path = self.locks_path / self.softlock_fname(run_hash)
if force:
# Force-release container locks if any
for container_dir in ('meta', 'seqs'):
soft_lock_path = self.repo_path / container_dir / 'locks' / self.softlock_fname(run_hash)
if soft_lock_path.exists():
soft_lock_path.unlink()
unix_lock_path = self.repo_path / container_dir / 'locks' / run_hash
if unix_lock_path.exists():
unix_lock_path.unlink()

# Force-release run lock
if lock_path.exists():
lock_path.unlink()
else:
lock_info = self.get_container_lock_info(run_hash)
if lock_info.locked and lock_info.version == LockingVersion.LEGACY:
success = False
elif lock_info.locked and self.is_stalled_lock(lock_path):
assert lock_info.version == LockingVersion.NEW
logger.info(f'Detected stalled lock for Run \'{run_hash}\'. Removing lock.')
lock_path.unlink()
if lock_info.locked:
if self.is_stalled_lock(lock_path):
assert lock_info.version == LockingVersion.NEW
logger.info(f'Detected stalled lock for Run \'{run_hash}\'. Removing lock.')
lock_path.unlink()
else:
success = False
return success

def is_stalled_lock(self, lock_file_path: Path) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion src/python/aim/_sdk/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def from_path(cls, path: str, read_only: bool = True) -> 'Repo':
return repo

@classmethod
def get_version(cls, path: str) -> Optional[tuple[int, ...]]:
def get_version(cls, path: str) -> Optional[Tuple[int, ...]]:
if cls.is_remote_path(path):
return None
path = clean_repo_path(path)
Expand Down
7 changes: 7 additions & 0 deletions src/python/aim/_sdk/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,13 @@ def _check(self, query, query_cache, *, aliases=()) -> bool:
query_params.update({cp: c_proxy for cp in container_alias_names})
return query.check(**query_params)

def delete(self):
del self._data_loader()[(self._ctx_idx, self._name)]
del self._container_tree[(KeyNames.SEQUENCES, self._ctx_idx, self._name)]

self._info.empty = True
self._info.next_step = 0

def __repr__(self) -> str:
return f'<{self.get_typename()} #{hash(self)} name={self.name} context={self._ctx_idx}>'

Expand Down

0 comments on commit a4e6bc1

Please sign in to comment.