Skip to content

Commit

Permalink
Fast Merge bug fixes (#2497)
Browse files Browse the repository at this point in the history
  • Loading branch information
farizrahman4u committed Jul 27, 2023
1 parent 48e9073 commit fe1a109
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 13 deletions.
6 changes: 4 additions & 2 deletions deeplake/core/chunk_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ def last_appended_chunk_name(self) -> str:
def last_appended_chunk_id(self) -> str:
return self.chunk_id_encoder.get_id_for_chunk(-1)

def last_appended_chunk(self) -> Optional[BaseChunk]:
def last_appended_chunk(self, allow_copy=True) -> Optional[BaseChunk]:
last_index = self.num_samples - 1
if self.num_chunks == 0 or last_index in self.tile_encoder:
return None
Expand All @@ -556,6 +556,8 @@ def last_appended_chunk(self) -> Optional[BaseChunk]:
chunk.key = chunk_key
chunk.id = self.last_appended_chunk_id
if chunk_commit_id != self.commit_id:
if not allow_copy:
return None
chunk = self.copy_chunk_to_new_commit(chunk, chunk_name)
if (
self.active_appended_chunk is not None
Expand Down Expand Up @@ -1010,7 +1012,7 @@ def _extend(self, samples, progressbar, pg_callback=None, update_commit_diff=Tru
)
self._samples_to_chunks(
samples,
start_chunk=self.last_appended_chunk(),
start_chunk=self.last_appended_chunk(allow_copy=False),
register=True,
progressbar=progressbar,
update_commit_diff=update_commit_diff,
Expand Down
1 change: 1 addition & 0 deletions deeplake/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,7 @@ def create_tensor_like(
def _rename_tensor(self, name, new_name):
tensor = self[name]
tensor.meta.name = new_name
tensor.meta.is_dirty = True
key = self.version_state["tensor_names"].pop(name)
meta = self.meta
if key not in meta.hidden_tensors:
Expand Down
29 changes: 18 additions & 11 deletions deeplake/util/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,11 @@ def find_updated_and_conflicts(
target_id_to_index_map: A dictionary mapping sample ids to their index in the target id.
Returns:
A tuple of list of tuples of the form (original_idx, target_idx)
updated indexes, conflict indexes, resurrect_indexes
"""
updated_indexes: List[Tuple[int, int]] = []
conflict_indexes: List[Tuple[int, int]] = []
resurrect_indexes: List[int] = []
for id in target_id_changes_commit_map:
target_commit_ids = target_id_changes_commit_map[id]
original_commit_ids = original_id_changes_commit_map[id]
Expand All @@ -403,15 +404,18 @@ def find_updated_and_conflicts(
idx is not None and target_commit_ids[idx] == original_commit_ids[0]
):
target_idx: int = target_id_to_index_map[id]
original_idx: int = original_id_to_index_map[id]
updated_indexes.append((original_idx, target_idx))
try:
original_idx: int = original_id_to_index_map[id]
updated_indexes.append((original_idx, target_idx))
except KeyError:
resurrect_indexes.append(target_idx)

# if no id is common or if a commit id other than the most recent commit_id is in common, there's a conflict
elif idx is None or idx > 0:
target_idx = target_id_to_index_map[id]
original_idx = original_id_to_index_map[id]
conflict_indexes.append((original_idx, target_idx))
return updated_indexes, conflict_indexes
return updated_indexes, conflict_indexes, resurrect_indexes


def find_new_updated_and_conflict_indexes(
Expand Down Expand Up @@ -466,12 +470,13 @@ def find_new_updated_and_conflict_indexes(
)
conflict_indexes: List[Tuple[int, int]] = []
updated_indexes: List[Tuple[int, int]] = []
updated_indexes, conflict_indexes = find_updated_and_conflicts(
updated_indexes, conflict_indexes, resurrect_indexes = find_updated_and_conflicts(
original_id_changes_commit_map,
target_id_changes_commit_map,
original_id_to_index_map,
target_id_to_index_map,
)
new_indexes.extend(resurrect_indexes)
return new_indexes, updated_indexes, conflict_indexes


Expand Down Expand Up @@ -692,7 +697,7 @@ def _group_ranges(x):
def _merge_encodings(enc1, enc2, start, end, off1=None, off2=None):
n1 = len(enc1)
if not n1:
return enc2
return enc2[start:end]
n2 = len(enc2)
if not n2:
return enc1
Expand Down Expand Up @@ -742,6 +747,8 @@ def _get_required_chunks_for_range(tensor, start, end):
else:
return (start_row, start_row + 1), None, None
elif num_required_chunks == 2:
if start_chunk_aligned and end_chunk_aligned:
return (start_row, end_row + 1), None, None
if not start_chunk_aligned and not end_chunk_aligned:
return None, (start, end), None
if start_chunk_aligned:
Expand Down Expand Up @@ -927,8 +934,8 @@ def copy_tensor_slice(
dest_seq_encoder = dest_eng.sequence_encoder
dest_seq_encoder.is_dirty = True
dest_meta_seq_length = 0
dest_tensor.meta.links = {}
links = dest_tensor.meta.links
dest_tensor.meta.links = {}
try:
for start, end in ranges:
if is_seq:
Expand Down Expand Up @@ -976,10 +983,10 @@ def copy_tensor_slice(
dest_meta.length = dest_meta_orig_length + (
dest_meta_seq_length if is_seq else dest_meta_length
)
dest_meta.is_dirty = True
dest_storage.flush()
finally:
dest_tensor.meta.links = links
dest_meta.is_dirty = True
dest_storage.flush()
if _copy_link_tensors:
if not is_seq:
flat_ranges = ranges
Expand All @@ -990,9 +997,9 @@ def copy_tensor_slice(
]
for l, flat in links:
dest_link_tensor = getattr(dest_tensor, l, None)
if dest_link_tensor:
if dest_link_tensor is not None:
src_link_tensor = getattr(src_tensor, l, None)
if src_link_tensor:
if src_link_tensor is not None:
copy_tensor_slice(
src_ds,
dest_ds,
Expand Down

0 comments on commit fe1a109

Please sign in to comment.