diff --git a/tests/test_data_source.py b/tests/test_data_source.py index a2b1d0ed8..dd81d09e5 100644 --- a/tests/test_data_source.py +++ b/tests/test_data_source.py @@ -49,6 +49,13 @@ def test_openai(self, size): openai = Dataset.OPENAI.manager(size) self.per_dataset_test(openai) + @pytest.mark.parametrize("size", [ + 100_000_000, + ]) + def test_laion(self, size): + openai = Dataset.LAION.manager(size) + self.per_dataset_test(openai) + def per_dataset_test(self, dataset: DatasetManager): s3_reader = AwsS3Reader() diff --git a/tests/test_dataset.py b/tests/test_dataset.py index f6ba05036..06daf0767 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -1,4 +1,4 @@ -from vectordb_bench.backend.dataset import Dataset, get_files +from vectordb_bench.backend.dataset import Dataset, compose_train_files import logging import pytest from pydantic import ValidationError @@ -43,19 +43,15 @@ class TestGetFiles: 50, 100, ]) - @pytest.mark.parametrize("with_gt", [True, False]) - def test_train_count(self, train_count, with_gt): - files = get_files(train_count, True, with_gt) + def test_train_count(self, train_count): + files = compose_train_files(train_count, True) log.info(files) - if with_gt: - assert len(files) - 4 == train_count - else: - assert len(files) - 1 == train_count + assert len(files) == train_count @pytest.mark.parametrize("use_shuffled", [True, False]) def test_use_shuffled(self, use_shuffled): - files = get_files(1, use_shuffled, True) + files = compose_train_files(1, use_shuffled) log.info(files) trains = [f for f in files if "train" in f] diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py index a227b4147..e0b4e90b8 100644 --- a/vectordb_bench/backend/dataset.py +++ b/vectordb_bench/backend/dataset.py @@ -22,7 +22,7 @@ log = logging.getLogger(__name__) -SizeLabel = namedtuple('SizeLabel', ['size', 'label', 'files']) +SizeLabel = namedtuple('SizeLabel', ['size', 'label', 'file_count']) class BaseDataset(BaseModel): @@ -31,6 +31,7 @@ class BaseDataset(BaseModel): dim: int metric_type: MetricType use_shuffled: bool + with_gt: bool = False _size_label: dict[int, SizeLabel] = PrivateAttr() @validator("size") @@ -48,34 +49,39 @@ def dir_name(self) -> str: return f"{self.name}_{self.label}_{utils.numerize(self.size)}".lower() @property - def files(self) -> str: - return self._size_label.get(self.size).files + def file_count(self) -> int: + return self._size_label.get(self.size).file_count -def get_files(train_count: int, use_shuffled: bool, with_gt: bool = True) -> list[str]: +def compose_train_files(train_count: int, use_shuffled: bool) -> list[str]: prefix = "shuffle_train" if use_shuffled else "train" middle = f"of-{train_count}" surfix = "parquet" train_files = [] if train_count > 1: - just_size = len(str(train_count)) + just_size = 2 for i in range(train_count): sub_file = f"{prefix}-{str(i).rjust(just_size, '0')}-{middle}.{surfix}" train_files.append(sub_file) else: train_files.append(f"{prefix}.{surfix}") - files = ['test.parquet'] - if with_gt: - files.extend([ - 'neighbors.parquet', - 'neighbors_tail_1p.parquet', - 'neighbors_head_1p.parquet', - ]) + return train_files + + +def compose_gt_file(filters: int | float | str | None = None) -> str: + if filters is None: + return "neighbors.parquet" + + if filters == 0.01: + return "neighbors_head_1p.parquet" + + if filters == 0.99: + return "neighbors_tail_1p.parquet" + + raise ValueError(f"Filters not supported: {filters}") - files.extend(train_files) - return files class LAION(BaseDataset): @@ -83,8 +89,9 @@ class LAION(BaseDataset): dim: int = 768 metric_type: MetricType = MetricType.L2 use_shuffled: bool = False + with_gt: bool = True _size_label: dict = { - 100_000_000: SizeLabel(100_000_000, "LARGE", get_files(100, False)), + 100_000_000: SizeLabel(100_000_000, "LARGE", 100), } @@ -94,8 +101,8 @@ class GIST(BaseDataset): metric_type: MetricType = MetricType.L2 use_shuffled: bool = False _size_label: dict = { - 100_000: SizeLabel(100_000, "SMALL", get_files(1, False, False)), - 1_000_000: SizeLabel(1_000_000, "MEDIUM", get_files(1, False, False)), + 100_000: SizeLabel(100_000, "SMALL", 1), + 1_000_000: SizeLabel(1_000_000, "MEDIUM", 1), } @@ -104,10 +111,11 @@ class Cohere(BaseDataset): dim: int = 768 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = config.USE_SHUFFLED_DATA + with_gt: bool = True, _size_label: dict = { - 100_000: SizeLabel(100_000, "SMALL", get_files(1, config.USE_SHUFFLED_DATA)), - 1_000_000: SizeLabel(1_000_000, "MEDIUM", get_files(1, config.USE_SHUFFLED_DATA)), - 10_000_000: SizeLabel(10_000_000, "LARGE", get_files(10, config.USE_SHUFFLED_DATA)), + 100_000: SizeLabel(100_000, "SMALL", 1), + 1_000_000: SizeLabel(1_000_000, "MEDIUM", 1), + 10_000_000: SizeLabel(10_000_000, "LARGE", 10), } @@ -116,7 +124,7 @@ class Glove(BaseDataset): dim: int = 200 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = False - _size_label: dict = {1_000_000: SizeLabel(1_000_000, "MEDIUM", get_files(1, False, False))} + _size_label: dict = {1_000_000: SizeLabel(1_000_000, "MEDIUM", 1)} class SIFT(BaseDataset): @@ -125,9 +133,9 @@ class SIFT(BaseDataset): metric_type: MetricType = MetricType.L2 use_shuffled: bool = False _size_label: dict = { - 500_000: SizeLabel(500_000, "SMALL", get_files(1, False, False)), - 5_000_000: SizeLabel(5_000_000, "MEDIUM", get_files(1, False, False)), - # 50_000_000: SizeLabel(50_000_000, "LARGE", get_files(50, False, False)), + 500_000: SizeLabel(500_000, "SMALL", 1,), + 5_000_000: SizeLabel(5_000_000, "MEDIUM", 1), + # 50_000_000: SizeLabel(50_000_000, "LARGE", 50), } @@ -136,10 +144,11 @@ class OpenAI(BaseDataset): dim: int = 1536 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = config.USE_SHUFFLED_DATA + with_gt: bool = True, _size_label: dict = { - 50_000: SizeLabel(50_000, "SMALL", get_files(1, config.USE_SHUFFLED_DATA)), - 500_000: SizeLabel(500_000, "MEDIUM", get_files(1, config.USE_SHUFFLED_DATA)), - 5_000_000: SizeLabel(5_000_000, "LARGE", get_files(10, config.USE_SHUFFLED_DATA)), + 50_000: SizeLabel(50_000, "SMALL", 1), + 500_000: SizeLabel(500_000, "MEDIUM", 1), + 5_000_000: SizeLabel(5_000_000, "LARGE", 10), } @@ -155,6 +164,7 @@ class DatasetManager(BaseModel): """ data: BaseDataset test_data: pd.DataFrame | None = None + gt_data: pd.DataFrame | None = None train_files : list[str] = [] reader: DatasetReader | None = None @@ -180,50 +190,51 @@ def data_dir(self) -> pathlib.Path: def __iter__(self): return DataSetIterator(self) - def prepare(self, source: DatasetSource=DatasetSource.S3, check: bool=True) -> bool: + # TODO passing use_shuffle from outside + def prepare(self, + source: DatasetSource=DatasetSource.S3, + check: bool=True, + filters: int | float | str | None = None, + ) -> bool: """Download the dataset from DatasetSource url = f"{source}/{self.data.dir_name}" - download files from url to self.data_dir, there'll be 4 types of files in the data_dir - - train*.parquet: for training - - test.parquet: for testing - - neighbors.parquet: ground_truth of the test.parquet - - neighbors_head_1p.parquet: ground_truth of the test.parquet after filtering 1% data - - neighbors_99p.parquet: ground_truth of the test.parquet after filtering 99% data - Args: source(DatasetSource): S3 or AliyunOSS, default as S3 - check(bool): Whether to do etags check + check(bool): Whether to do etags check, default as ture + filters(Optional[int | float | str]): combined with dataset's with_gt to + compose the correct ground_truth file Returns: bool: whether the dataset is successfully prepared """ + file_count, use_shuffled = self.data.file_count, self.data.use_shuffled + + train_files = compose_train_files(file_count, use_shuffled) + all_files = train_files + + gt_file, test_file = None, None + if self.data.with_gt: + gt_file, test_file = compose_gt_file(filters), "test.parquet" + all_files.extend([gt_file, test_file]) + source.reader().read( dataset=self.data.dir_name.lower(), - files=self.data.files, + files=all_files, local_ds_root=self.data_dir, check_etag=check, ) - prefix = "shuffle_train" if self.data.use_shuffled else "train" + if gt_file is not None and test_file is not None: + self.test_data = self._read_file(test_file) + self.gt_data = self._read_file(gt_file) + + prefix = "shuffle_train" if use_shuffled else "train" self.train_files = sorted([f.name for f in self.data_dir.glob(f'{prefix}*.parquet')]) log.debug(f"{self.data.name}: available train files {self.train_files}") - self.test_data = self._read_file("test.parquet") - return True - def get_ground_truth(self, filters: int | float | None = None) -> pd.DataFrame: - - file_name = "" - if filters is None: - file_name = "neighbors.parquet" - elif filters == 0.01: - file_name = "neighbors_head_1p.parquet" - elif filters == 0.99: - file_name = "neighbors_tail_1p.parquet" - else: - raise ValueError(f"Filters not supported: {filters}") - return self._read_file(file_name) + return True def _read_file(self, file_name: str) -> pd.DataFrame: """read one file from disk into memory""" diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index 80c5ac1df..0a5f12fd5 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -84,7 +84,7 @@ def init_db(self, drop_old: bool = True) -> None: def _pre_run(self, drop_old: bool = True): try: self.init_db(drop_old) - self.ca.dataset.prepare(self.dataset_source) + self.ca.dataset.prepare(self.dataset_source, filters=self.ca.filter_rate) except ModuleNotFoundError as e: log.warning(f"pre run case error: please install client for db: {self.config.db}, error={e}") raise e from None @@ -215,7 +215,8 @@ def _init_search_runner(self): test_emb = test_emb / np.linalg.norm(test_emb, axis=1)[:, np.newaxis] self.test_emb = test_emb.tolist() - gt_df = self.ca.dataset.get_ground_truth(self.ca.filter_rate) + # gt_df = self.ca.dataset.get_ground_truth(self.ca.filter_rate) + gt_df = self.ca.dataset.gt_data self.serial_search_runner = SerialSearchRunner( db=self.db,