Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioned HNSW Deeplake Side Changes. #2847

Merged
merged 24 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5e99d16
Partitioned HNSW Deeplake Side Changes.
sounakr May 8, 2024
607e81a
Partitioned HNSW Deeplake Side Changes. Deserialization changes for i…
sounakr May 21, 2024
2f67b52
Partitioned HNSW Deeplake Side Changes. Test case addition.
sounakr May 23, 2024
3d153bc
Partitioned HNSW Deeplake Side Changes. Rectify Test Case
sounakr May 23, 2024
2d1f376
Partitioned HNSW Deeplake Side Changes. Rectify Test Case Phase1
sounakr May 24, 2024
8655c0f
Partitioned HNSW Deeplake Side Changes. Don't compare partition numbe…
sounakr May 25, 2024
ef7e0d0
Partitioned HNSW Deeplake Side Changes.
sounakr May 8, 2024
140da23
Partitioned HNSW Deeplake Side Changes. Deserialization changes for i…
sounakr May 21, 2024
00d3927
Partitioned HNSW Deeplake Side Changes. Test case addition.
sounakr May 23, 2024
6856074
Partitioned HNSW Deeplake Side Changes. Rectify Test Case
sounakr May 23, 2024
88d48ba
Partitioned HNSW Deeplake Side Changes. Rectify Test Case Phase1
sounakr May 24, 2024
4486016
Partitioned HNSW Deeplake Side Changes. Don't compare partition numbe…
sounakr May 25, 2024
c3cc62f
bumb libdeeplake to 0.0.130
activesoull May 25, 2024
b906611
libdeeplake to 0.0.129
azat-manukyan May 27, 2024
3e2f2af
Bump libdeeplake version.
khustup2 Jun 4, 2024
f01b6d0
- Merge branch 'main' of https://github.com/activeloopai/deeplake in…
sounakr Jun 5, 2024
9058dbc
Reformatting.
sounakr Jun 5, 2024
c6857de
- Merge branch 'main' of https://github.com/activeloopai/deeplake in…
sounakr Jun 5, 2024
6a7fc70
Bump Up version.
sounakr Jun 5, 2024
9e6e1d3
Adding Incremental test cases.
sounakr Jun 5, 2024
6ae45da
Fixing Test cases.
sounakr Jun 5, 2024
817e1ef
Bump libdeeplake version.
khustup2 Jun 5, 2024
6ac1a9b
Bump libdeeplake version.
khustup2 Jun 12, 2024
addca99
Merge branch 'main' into partitioned_hnsw
khustup2 Jun 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deeplake/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@
"additional_params": {
"efConstruction": 600,
"M": 32,
"partitions": 1,
},
}
VECTORSTORE_EXTEND_BATCH_SIZE = 500
Expand Down
47 changes: 40 additions & 7 deletions deeplake/core/index_maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ def index_exists(dataset):
return False


def index_partition_count(dataset):
emb_tensor = fetch_embedding_tensor(dataset)
if emb_tensor is not None:
vdb_indexes = emb_tensor.fetch_vdb_indexes()
if len(vdb_indexes) == 0:
return 1
else:
additional_params = vdb_indexes[0].get("additional_params", {})
if additional_params is None:
return 1
return additional_params.get("partitions", 1)
else:
return 1


def index_used(exec_option):
"""Check if the index is used for the exec_option"""
return exec_option in ("tensor_db", "compute_engine")
Expand Down Expand Up @@ -101,8 +116,17 @@ def check_index_params(self):

existing_distance = existing_params.get("distance", "COS")
if curr_distance == existing_distance:
current_additional_params_dict = current_params.get("additional_params", None)
existing_additional_params_dict = existing_params.get("additional_params", None)
current_additional_params_dict = current_params.get(
"additional_params", {}
).copy()
existing_additional_params_dict = existing_params.get(
"additional_params", {}
).copy()

# Remove the 'partitions' key from the copies of the dictionaries
current_additional_params_dict.pop("partitions", None)
existing_additional_params_dict.pop("partitions", None)

if current_additional_params_dict == existing_additional_params_dict:
return True

Expand Down Expand Up @@ -134,9 +158,9 @@ def get_index_metric(metric):


def normalize_additional_params(params: dict) -> dict:
mapping = {"efconstruction": "efConstruction", "m": "M"}
mapping = {"efconstruction": "efConstruction", "m": "M", "partitions": "partitions"}

allowed_keys = ["efConstruction", "m"]
allowed_keys = ["efConstruction", "m", "partitions"]

# New dictionary to store the result with desired key format
result_dict = {}
Expand Down Expand Up @@ -180,7 +204,9 @@ def check_vdb_indexes(dataset):
return False


def _incr_maintenance_vdb_indexes(tensor, indexes, index_operation):
def _incr_maintenance_vdb_indexes(
tensor, indexes, index_operation, is_partitioned=False
):
try:
is_embedding = tensor.htype == "embedding"
has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes")
Expand All @@ -194,6 +220,7 @@ def _incr_maintenance_vdb_indexes(tensor, indexes, index_operation):
tensor.update_vdb_index(
operation_kind=index_operation,
row_ids=indexes,
is_partitioned=is_partitioned,
)
except Exception as e:
raise Exception(f"An error occurred while regenerating VDB indexes: {e}")
Expand Down Expand Up @@ -247,7 +274,6 @@ def index_operation_dataset(self, dml_type, rowids):

if index_operation_type == INDEX_OP_TYPE.NOOP:
return

if (
index_operation_type == INDEX_OP_TYPE.CREATE_INDEX
or index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX
Expand All @@ -272,6 +298,13 @@ def index_operation_dataset(self, dml_type, rowids):
else:
emb_tensor.create_vdb_index("hnsw_1", distance=distance)
elif index_operation_type == INDEX_OP_TYPE.INCREMENTAL_INDEX:
_incr_maintenance_vdb_indexes(emb_tensor, rowids, dml_type)
partition_count = index_partition_count(self)
print(f"Partition count: {partition_count}")
if partition_count > 1:
_incr_maintenance_vdb_indexes(
emb_tensor, rowids, dml_type, is_partitioned=True
)
else:
_incr_maintenance_vdb_indexes(emb_tensor, rowids, dml_type)
else:
raise Exception("Unknown index operation")
12 changes: 12 additions & 0 deletions deeplake/core/meta/tensor_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ def get_vdb_index_ids(self):
index_ids.append(index["id"])
return index_ids

def update_vdb_partition(self, id: str, partition: int):
if not self.contains_vdb_index(id):
raise ValueError(f"Tensor meta has no vdb index with name '{id}'.")
for index in self.vdb_indexes:
if id == index["id"]:
additional_param = index["additional_params"]
if not isinstance(additional_param, dict):
raise ValueError("additional_params must be a dictionary")
additional_param["partitions"] = partition
self.is_dirty = True
return

def add_vdb_index(self, id: str, type: str, distance: str, **kwargs):
if self.contains_vdb_index(id):
raise ValueError(f"Tensor meta already has a vdb index with name '{id}'.")
Expand Down
185 changes: 180 additions & 5 deletions deeplake/core/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from deeplake.core.version_control.commit_chunk_map import CommitChunkMap
from deeplake.core.version_control.commit_diff import CommitDiff
from deeplake.core.chunk.base_chunk import InputSample
import json
import numpy as np
from typing import Dict, List, Sequence, Union, Optional, Tuple, Any, Callable
from functools import reduce, partial
Expand Down Expand Up @@ -1529,10 +1530,71 @@ def invalidate_libdeeplake_dataset(self):
"""Invalidates the libdeeplake dataset object."""
self.dataset.libdeeplake_dataset = None

def deserialize_partitions(self, serialized_data, incremental_dml=False):
from io import BytesIO

stream = BytesIO(serialized_data)

# Read number of partitions
num_partitions = int.from_bytes(
stream.read(8), "little"
) # Assuming size_t is 8 bytes

partition_info = []
for _ in range(num_partitions):
# Read partition name length and name
name_length = int.from_bytes(stream.read(8), "little")
name = stream.read(name_length).decode("utf-8")

# Read start and end indices
start = int.from_bytes(stream.read(8), "little")
end = int.from_bytes(stream.read(8), "little")

partition_info.append({"name": name, "start": start, "end": end})

incr_info = []
if incremental_dml == True:
# Check for incremental update info
incr_info_size = int.from_bytes(stream.read(8), "little")
for _ in range(incr_info_size):
name_length = int.from_bytes(stream.read(8), "little")
name = stream.read(name_length).decode("utf-8")

start = int.from_bytes(stream.read(8), "little")
end = int.from_bytes(stream.read(8), "little")

incr_info.append({"name": name, "start": start, "end": end})

# Extract the actual data for each partition
partitions_data = []
while True:
size_data = stream.read(8)
if not size_data:
break
size = int.from_bytes(size_data, "little")
partition_blob = stream.read(size)
partitions_data.append(partition_blob)

return partition_info, partitions_data, incr_info

def is_partitioned_vdb_index(self):
vdb_indexes = self.get_vdb_indexes()
if len(vdb_indexes) == 0:
return False
for vdb_index in vdb_indexes:
if (
vdb_index["additional_params"] is not None
and "partitions" in vdb_index["additional_params"]
and vdb_index["additional_params"]["partitions"] > 1
):
return True
return False

def update_vdb_index(
self,
operation_kind: int,
row_ids: List[int] = [],
is_partitioned: bool = False,
):
self.storage.check_readonly()
if self.meta.htype != "embedding":
Expand Down Expand Up @@ -1561,7 +1623,31 @@ def update_vdb_index(
for id, index in indexes:
b = index.serialize()
commit_id = self.version_state["commit_id"]
self.storage[get_tensor_vdb_index_key(self.key, commit_id, id)] = b
if is_partitioned:
metadata, partitions_data, incr_info = (
self.deserialize_partitions(b, incremental_dml=True)
)
partition_key = get_tensor_vdb_index_key(
self.key, commit_id, f"{id}_partition_metadata"
)
metadata_json = json.dumps(metadata)
metadata_bytes = metadata_json.encode("utf-8")
self.storage[partition_key] = metadata_bytes

incr_data_map = {
info["name"]: data
for info, data in zip(incr_info, partitions_data)
}
for info in incr_info:
partition_key = get_tensor_vdb_index_key(
self.key, commit_id, f"{id}_{info['name']}"
)
self.storage[partition_key] = incr_data_map[info["name"]]
self.meta.update_vdb_partition(id, len(metadata))
else:
self.storage[
get_tensor_vdb_index_key(self.key, commit_id, id)
] = b
self.storage.flush()
except:
raise
Expand All @@ -1574,7 +1660,30 @@ def update_vdb_index(
for id, index in indexes:
b = index.serialize()
commit_id = self.version_state["commit_id"]
self.storage[get_tensor_vdb_index_key(self.key, commit_id, id)] = b
if is_partitioned:
metadata, partitions_data, incr_info = (
self.deserialize_partitions(b, incremental_dml=True)
)
partition_key = get_tensor_vdb_index_key(
self.key, commit_id, f"{id}_partition_metadata"
)
metadata_json = json.dumps(metadata)
metadata_bytes = metadata_json.encode("utf-8")
self.storage[partition_key] = metadata_bytes

incr_data_map = {
info["name"]: data
for info, data in zip(incr_info, partitions_data)
}
for info in incr_info:
partition_key = get_tensor_vdb_index_key(
self.key, commit_id, f"{id}_{info['name']}"
)
self.storage[partition_key] = incr_data_map[info["name"]]
else:
self.storage[
get_tensor_vdb_index_key(self.key, commit_id, id)
] = b
self.storage.flush()
except:
raise
Expand All @@ -1587,7 +1696,30 @@ def update_vdb_index(
for id, index in indexes:
b = index.serialize()
commit_id = self.version_state["commit_id"]
self.storage[get_tensor_vdb_index_key(self.key, commit_id, id)] = b
if is_partitioned:
metadata, partitions_data, incr_info = (
self.deserialize_partitions(b, incremental_dml=True)
)
partition_key = get_tensor_vdb_index_key(
self.key, commit_id, f"{id}_partition_metadata"
)
metadata_json = json.dumps(metadata)
metadata_bytes = metadata_json.encode("utf-8")
self.storage[partition_key] = metadata_bytes

incr_data_map = {
info["name"]: data
for info, data in zip(incr_info, partitions_data)
}
for info in incr_info:
partition_key = get_tensor_vdb_index_key(
self.key, commit_id, f"{id}_{info['name']}"
)
self.storage[partition_key] = incr_data_map[info["name"]]
else:
self.storage[
get_tensor_vdb_index_key(self.key, commit_id, id)
] = b
self.storage.flush()
self.storage.flush()
except:
Expand Down Expand Up @@ -1634,7 +1766,27 @@ def create_vdb_index(
)
b = index.serialize()
commit_id = self.version_state["commit_id"]
self.storage[get_tensor_vdb_index_key(self.key, commit_id, id)] = b
# Check if the index is partitioned
if (
additional_params
and "partitions" in additional_params
and additional_params["partitions"] > 1
):
metadata, partitions_data, incr_info = self.deserialize_partitions(b)
partition_key = get_tensor_vdb_index_key(
self.key, commit_id, f"{id}_partition_metadata"
)
metadata_json = json.dumps(metadata)
metadata_bytes = metadata_json.encode("utf-8")
self.storage[partition_key] = metadata_bytes
for i, data in enumerate(partitions_data):
partition_key = get_tensor_vdb_index_key(
self.key, commit_id, f"{id}_part_{i}"
)
self.storage[partition_key] = data
else:
self.storage[get_tensor_vdb_index_key(self.key, commit_id, id)] = b

self.invalidate_libdeeplake_dataset()
except:
self.meta.remove_vdb_index(id=id)
Expand All @@ -1647,7 +1799,30 @@ def delete_vdb_index(self, id: str):
raise Exception(f"Only supported for embedding tensors.")
commit_id = self.version_state["commit_id"]
self.unload_vdb_index_cache()
self.storage.pop(get_tensor_vdb_index_key(self.key, commit_id, id))
if self.is_partitioned_vdb_index():
metadata_file = self.storage[
get_tensor_vdb_index_key(
self.key,
self.version_state["commit_id"],
f"{id}_partition_metadata",
)
]
metadata = json.loads(metadata_file.decode("utf-8"))
for part in metadata:
partition_key = get_tensor_vdb_index_key(
self.key, self.version_state["commit_id"], f"{id}_{part['name']}"
)
self.storage.pop(partition_key)
self.storage.pop(
get_tensor_vdb_index_key(
self.key,
self.version_state["commit_id"],
f"{id}_partition_metadata",
)
)
else:
self.storage.pop(get_tensor_vdb_index_key(self.key, commit_id, id))

self.meta.remove_vdb_index(id=id)
self.invalidate_libdeeplake_dataset()
self.storage.flush()
Expand Down
Loading
Loading