Skip to content

Commit

Permalink
fix: Unable to download some files from AliyunOSS
Browse files Browse the repository at this point in the history
Fixes: zilliztech#267, zilliztech#275, zilliztech#285

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed Mar 4, 2024
1 parent b53c7ce commit a0c15f3
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 63 deletions.
7 changes: 7 additions & 0 deletions tests/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ def test_openai(self, size):
openai = Dataset.OPENAI.manager(size)
self.per_dataset_test(openai)

@pytest.mark.parametrize("size", [
100_000_000,
])
def test_laion(self, size):
openai = Dataset.LAION.manager(size)
self.per_dataset_test(openai)


def per_dataset_test(self, dataset: DatasetManager):
s3_reader = AwsS3Reader()
Expand Down
14 changes: 5 additions & 9 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from vectordb_bench.backend.dataset import Dataset, get_files
from vectordb_bench.backend.dataset import Dataset, compose_train_files
import logging
import pytest
from pydantic import ValidationError
Expand Down Expand Up @@ -43,19 +43,15 @@ class TestGetFiles:
50,
100,
])
@pytest.mark.parametrize("with_gt", [True, False])
def test_train_count(self, train_count, with_gt):
files = get_files(train_count, True, with_gt)
def test_train_count(self, train_count):
files = compose_train_files(train_count, True)
log.info(files)

if with_gt:
assert len(files) - 4 == train_count
else:
assert len(files) - 1 == train_count
assert len(files) == train_count

@pytest.mark.parametrize("use_shuffled", [True, False])
def test_use_shuffled(self, use_shuffled):
files = get_files(1, use_shuffled, True)
files = compose_train_files(1, use_shuffled)
log.info(files)

trains = [f for f in files if "train" in f]
Expand Down
115 changes: 63 additions & 52 deletions vectordb_bench/backend/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
log = logging.getLogger(__name__)


SizeLabel = namedtuple('SizeLabel', ['size', 'label', 'files'])
SizeLabel = namedtuple('SizeLabel', ['size', 'label', 'file_count'])


class BaseDataset(BaseModel):
Expand All @@ -31,6 +31,7 @@ class BaseDataset(BaseModel):
dim: int
metric_type: MetricType
use_shuffled: bool
with_gt: bool = False
_size_label: dict[int, SizeLabel] = PrivateAttr()

@validator("size")
Expand All @@ -48,43 +49,49 @@ def dir_name(self) -> str:
return f"{self.name}_{self.label}_{utils.numerize(self.size)}".lower()

@property
def files(self) -> str:
return self._size_label.get(self.size).files
def file_count(self) -> int:
return self._size_label.get(self.size).file_count


def get_files(train_count: int, use_shuffled: bool, with_gt: bool = True) -> list[str]:
def compose_train_files(train_count: int, use_shuffled: bool) -> list[str]:
prefix = "shuffle_train" if use_shuffled else "train"
middle = f"of-{train_count}"
surfix = "parquet"

train_files = []
if train_count > 1:
just_size = len(str(train_count))
just_size = 2
for i in range(train_count):
sub_file = f"{prefix}-{str(i).rjust(just_size, '0')}-{middle}.{surfix}"
train_files.append(sub_file)
else:
train_files.append(f"{prefix}.{surfix}")

files = ['test.parquet']
if with_gt:
files.extend([
'neighbors.parquet',
'neighbors_tail_1p.parquet',
'neighbors_head_1p.parquet',
])
return train_files


def compose_gt_file(filters: int | float | str | None = None) -> str:
if filters is None:
return "neighbors.parquet"

if filters == 0.01:
return "neighbors_head_1p.parquet"

if filters == 0.99:
return "neighbors_tail_1p.parquet"

raise ValueError(f"Filters not supported: {filters}")

files.extend(train_files)
return files


class LAION(BaseDataset):
name: str = "LAION"
dim: int = 768
metric_type: MetricType = MetricType.L2
use_shuffled: bool = False
with_gt: bool = True
_size_label: dict = {
100_000_000: SizeLabel(100_000_000, "LARGE", get_files(100, False)),
100_000_000: SizeLabel(100_000_000, "LARGE", 100),
}


Expand All @@ -94,8 +101,8 @@ class GIST(BaseDataset):
metric_type: MetricType = MetricType.L2
use_shuffled: bool = False
_size_label: dict = {
100_000: SizeLabel(100_000, "SMALL", get_files(1, False, False)),
1_000_000: SizeLabel(1_000_000, "MEDIUM", get_files(1, False, False)),
100_000: SizeLabel(100_000, "SMALL", 1),
1_000_000: SizeLabel(1_000_000, "MEDIUM", 1),
}


Expand All @@ -104,10 +111,11 @@ class Cohere(BaseDataset):
dim: int = 768
metric_type: MetricType = MetricType.COSINE
use_shuffled: bool = config.USE_SHUFFLED_DATA
with_gt: bool = True,
_size_label: dict = {
100_000: SizeLabel(100_000, "SMALL", get_files(1, config.USE_SHUFFLED_DATA)),
1_000_000: SizeLabel(1_000_000, "MEDIUM", get_files(1, config.USE_SHUFFLED_DATA)),
10_000_000: SizeLabel(10_000_000, "LARGE", get_files(10, config.USE_SHUFFLED_DATA)),
100_000: SizeLabel(100_000, "SMALL", 1),
1_000_000: SizeLabel(1_000_000, "MEDIUM", 1),
10_000_000: SizeLabel(10_000_000, "LARGE", 10),
}


Expand All @@ -116,7 +124,7 @@ class Glove(BaseDataset):
dim: int = 200
metric_type: MetricType = MetricType.COSINE
use_shuffled: bool = False
_size_label: dict = {1_000_000: SizeLabel(1_000_000, "MEDIUM", get_files(1, False, False))}
_size_label: dict = {1_000_000: SizeLabel(1_000_000, "MEDIUM", 1)}


class SIFT(BaseDataset):
Expand All @@ -125,9 +133,9 @@ class SIFT(BaseDataset):
metric_type: MetricType = MetricType.L2
use_shuffled: bool = False
_size_label: dict = {
500_000: SizeLabel(500_000, "SMALL", get_files(1, False, False)),
5_000_000: SizeLabel(5_000_000, "MEDIUM", get_files(1, False, False)),
# 50_000_000: SizeLabel(50_000_000, "LARGE", get_files(50, False, False)),
500_000: SizeLabel(500_000, "SMALL", 1,),
5_000_000: SizeLabel(5_000_000, "MEDIUM", 1),
# 50_000_000: SizeLabel(50_000_000, "LARGE", 50),
}


Expand All @@ -136,10 +144,11 @@ class OpenAI(BaseDataset):
dim: int = 1536
metric_type: MetricType = MetricType.COSINE
use_shuffled: bool = config.USE_SHUFFLED_DATA
with_gt: bool = True,
_size_label: dict = {
50_000: SizeLabel(50_000, "SMALL", get_files(1, config.USE_SHUFFLED_DATA)),
500_000: SizeLabel(500_000, "MEDIUM", get_files(1, config.USE_SHUFFLED_DATA)),
5_000_000: SizeLabel(5_000_000, "LARGE", get_files(10, config.USE_SHUFFLED_DATA)),
50_000: SizeLabel(50_000, "SMALL", 1),
500_000: SizeLabel(500_000, "MEDIUM", 1),
5_000_000: SizeLabel(5_000_000, "LARGE", 10),
}


Expand All @@ -155,6 +164,7 @@ class DatasetManager(BaseModel):
"""
data: BaseDataset
test_data: pd.DataFrame | None = None
gt_data: pd.DataFrame | None = None
train_files : list[str] = []
reader: DatasetReader | None = None

Expand All @@ -180,50 +190,51 @@ def data_dir(self) -> pathlib.Path:
def __iter__(self):
return DataSetIterator(self)

def prepare(self, source: DatasetSource=DatasetSource.S3, check: bool=True) -> bool:
# TODO passing use_shuffle from outside
def prepare(self,
source: DatasetSource=DatasetSource.S3,
check: bool=True,
filters: int | float | str | None = None,
) -> bool:
"""Download the dataset from DatasetSource
url = f"{source}/{self.data.dir_name}"
download files from url to self.data_dir, there'll be 4 types of files in the data_dir
- train*.parquet: for training
- test.parquet: for testing
- neighbors.parquet: ground_truth of the test.parquet
- neighbors_head_1p.parquet: ground_truth of the test.parquet after filtering 1% data
- neighbors_99p.parquet: ground_truth of the test.parquet after filtering 99% data
Args:
source(DatasetSource): S3 or AliyunOSS, default as S3
check(bool): Whether to do etags check
check(bool): Whether to do etags check, default as ture
filters(Optional[int | float | str]): combined with dataset's with_gt to
compose the correct ground_truth file
Returns:
bool: whether the dataset is successfully prepared
"""
file_count, use_shuffled = self.data.file_count, self.data.use_shuffled

train_files = compose_train_files(file_count, use_shuffled)
all_files = train_files

gt_file, test_file = None, None
if self.data.with_gt:
gt_file, test_file = compose_gt_file(filters), "test.parquet"
all_files.extend([gt_file, test_file])

source.reader().read(
dataset=self.data.dir_name.lower(),
files=self.data.files,
files=all_files,
local_ds_root=self.data_dir,
check_etag=check,
)

prefix = "shuffle_train" if self.data.use_shuffled else "train"
if gt_file is not None and test_file is not None:
self.test_data = self._read_file(test_file)
self.gt_data = self._read_file(gt_file)

prefix = "shuffle_train" if use_shuffled else "train"
self.train_files = sorted([f.name for f in self.data_dir.glob(f'{prefix}*.parquet')])
log.debug(f"{self.data.name}: available train files {self.train_files}")
self.test_data = self._read_file("test.parquet")
return True

def get_ground_truth(self, filters: int | float | None = None) -> pd.DataFrame:

file_name = ""
if filters is None:
file_name = "neighbors.parquet"
elif filters == 0.01:
file_name = "neighbors_head_1p.parquet"
elif filters == 0.99:
file_name = "neighbors_tail_1p.parquet"
else:
raise ValueError(f"Filters not supported: {filters}")
return self._read_file(file_name)
return True

def _read_file(self, file_name: str) -> pd.DataFrame:
"""read one file from disk into memory"""
Expand Down
5 changes: 3 additions & 2 deletions vectordb_bench/backend/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def init_db(self, drop_old: bool = True) -> None:
def _pre_run(self, drop_old: bool = True):
try:
self.init_db(drop_old)
self.ca.dataset.prepare(self.dataset_source)
self.ca.dataset.prepare(self.dataset_source, filters=self.ca.filter_rate)
except ModuleNotFoundError as e:
log.warning(f"pre run case error: please install client for db: {self.config.db}, error={e}")
raise e from None
Expand Down Expand Up @@ -215,7 +215,8 @@ def _init_search_runner(self):
test_emb = test_emb / np.linalg.norm(test_emb, axis=1)[:, np.newaxis]
self.test_emb = test_emb.tolist()

gt_df = self.ca.dataset.get_ground_truth(self.ca.filter_rate)
# gt_df = self.ca.dataset.get_ground_truth(self.ca.filter_rate)
gt_df = self.ca.dataset.gt_data

self.serial_search_runner = SerialSearchRunner(
db=self.db,
Expand Down

0 comments on commit a0c15f3

Please sign in to comment.