diff --git a/configs/data_processing/lorem_ipsum_filter_pipeline_config.yaml b/configs/data_processing/lorem_ipsum_filter_pipeline_config.yaml new file mode 100644 index 00000000..2b09417b --- /dev/null +++ b/configs/data_processing/lorem_ipsum_filter_pipeline_config.yaml @@ -0,0 +1,21 @@ +params: + score_path: /raid/s3/opengptx/jude/repos/ml_filter/data/filtering_folder/annotations + tokenized_data_path: /raid/s3/opengptx/jude/repos/ml_filter/data/filtering_folder/tokenized + output_folder: /raid/s3/opengptx/jude/repos/ml_filter/data/filtering_folder/outputs + + thresholds: + score_Gemma_Snowflake: 3.0 + score_Llama_Snowflake: 2.0 + + base_file_prefix: /raid/s3/opengptx/jude/repos/ml_filter/data/filtering_folder/annotations + tokenized_data_extension: .pbin + +running_on_slurm: false + +local_settings: + tasks: 1 + local_tasks: 1 + local_rank_offset: 0 + logging_dir: null + +slurm_settings: null diff --git a/configs/data_processing/lorem_ipsum_filter_pipeline_config_slurm.yaml b/configs/data_processing/lorem_ipsum_filter_pipeline_config_slurm.yaml new file mode 100644 index 00000000..d50f17bb --- /dev/null +++ b/configs/data_processing/lorem_ipsum_filter_pipeline_config_slurm.yaml @@ -0,0 +1,34 @@ +params: + score_path: /leonardo_work/EUHPC_D21_101/alexj/repos/data/annotations + tokenized_data_path: /leonardo_work/EUHPC_D21_101/alexj/repos/data/tokenized + output_folder: /leonardo_work/EUHPC_D21_101/alexj/repos/data/outputs + + thresholds: + score_Gemma_Snowflake: 3.0 + score_Llama_Snowflake: 2.0 + + base_file_prefix: /leonardo_work/EUHPC_D21_101/alexj/repos/data/annotations + tokenized_data_extension: .pbin + +running_on_slurm: true + +local_settings: null + +slurm_settings: + sbatch_args: + account: "EUHPC_E05_119" + nodes: 1 + ntasks: 1 + gres: gpu:1 + partition: "boost_usr_prod" + time: "00:30:00" + cpus_per_task: 32 + gpus_per_task: 1 + mem_per_gpu: "8G" + job_name: "lorem_ipsum_filtering" + output: /data/cat/ws/alju972f-annotation_at_scale/.vscode/data/embedding_output_dir/scripts/slurm_output/%j.out + error: /data/cat/ws/alju972f-annotation_at_scale/.vscode/data/embedding_output_dir/scripts/slurm_output/%j.err + qos: "boost_qos_dbg" #"normal" + venv_path: /leonardo_work/EUHPC_D21_101/alexj/repos/scripts/env/venv_annotation_pipeline/bin/activate + tasks: 1 + workers: 1 \ No newline at end of file diff --git a/src/ml_filter/data_processing/score_based_filtering/__init__.py b/src/ml_filter/data_processing/score_based_filtering/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py new file mode 100644 index 00000000..5982e49d --- /dev/null +++ b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py @@ -0,0 +1,235 @@ +from __future__ import annotations + +import os +import sys +from pathlib import Path + +from datatrove.executor import LocalPipelineExecutor, SlurmPipelineExecutor +from datatrove.pipeline.base import PipelineStep +from pydantic import BaseModel, Field, model_validator +from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict, YamlConfigSettingsSource + +from ml_filter.data_processing.score_based_filtering.step_data_filtering import DataFiltering +from ml_filter.data_processing.score_based_filtering.step_score_parsing import ScoresParser + + +class FilterPipelineBuilder(BaseSettings): + """Configuration parameters and building for the score-based filtering pipeline. + This class defines the settings for running a data filtering pipeline that processes datasets based on scores. + It includes parameters for both local and Slurm execution environments. + The pipeline consists of steps for parsing scores and filtering datasets based on those scores. + + Besides initializing this class directly, it can also be configured using a YAML file or environment variables. + The YAML file can be specified using the `FILTER_PIPELINE_YAML_FILE` environment variable. + If no YAML file is provided, the class will use default settings and environment variables. + """ + + model_config = SettingsConfigDict(env_prefix="filter_pipeline_", env_nested_delimiter="__") + + # Pipeline configuration parameters + params: FilterPipelineParameters + + # Execution parameters + running_on_slurm: bool = False + local_settings: LocalExecutionSettings | None = None + slurm_settings: SlurmExecutionSettings | None = None + + @model_validator(mode="after") + def slurm_vs_local(self): + if self.running_on_slurm and self.local_settings is not None: + raise ValueError("Running on Slurm requires slurm execution settings, not local settings.") + if self.running_on_slurm and self.slurm_settings is None: + self.slurm_settings = SlurmExecutionSettings() + elif not self.running_on_slurm and self.slurm_settings is not None: + raise ValueError("Running locally requires local execution settings, not Slurm settings.") + if not self.running_on_slurm and self.local_settings is None: + self.local_settings = LocalExecutionSettings() + return self + + @model_validator(mode="after") + def set_logging_dir(self): + if self.local_settings is not None and self.local_settings.logging_dir is None: + self.local_settings.logging_dir = str(self.params.output_folder / "logs") + if self.slurm_settings is not None and self.slurm_settings.logging_dir is None: + self.slurm_settings.logging_dir = str(self.params.output_folder / "logs") + return self + + def build_pipeline_executor(self) -> LocalPipelineExecutor | SlurmPipelineExecutor: + """Builds the appropriate pipeline executor based on the execution settings.""" + pipeline = self._build_pipeline() + if self.running_on_slurm: + return SlurmPipelineExecutor(pipeline=pipeline, **self.slurm_settings.model_dump()) + else: + return LocalPipelineExecutor(pipeline=pipeline, **self.local_settings.model_dump()) + + def _build_pipeline(self) -> list[PipelineStep]: + """Builds the pipeline based on the provided configuration.""" + return build_pipeline( + score_path=self.params.score_path, + tokenized_data_path=self.params.tokenized_data_path, + output_folder=self.params.output_folder, + thresholds=self.params.thresholds, + base_file_prefix=self.params.base_file_prefix, + tokenized_data_extension=self.params.tokenized_data_extension, + ) + + @classmethod + def settings_customise_sources( + cls, + settings_cls: type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> tuple[PydanticBaseSettingsSource, ...]: + return ( + init_settings, + env_settings, + YamlConfigSettingsSource(settings_cls, yaml_file=os.getenv("FILTER_PIPELINE_YAML_FILE")), + dotenv_settings, + file_secret_settings, + ) + + +class FilterPipelineParameters(BaseModel): + """Parameters for the score-based filtering pipeline.""" + + score_path: Path = Field(..., description="The path to the directory containing JSONL files with scores.") + tokenized_data_path: Path = Field(..., description="The path for the tokenized data files.") + output_folder: Path = Field(..., description="The folder where the filtered datasets will be saved.") + thresholds: dict[str, float] = Field( + ..., description="Dictionary where keys are score names and values are thresholds to filter samples." + ) + base_file_prefix: Path = Field( + default=Path(""), + description="The prefix path for the raw/base files. This prefix will be removed " + "when mapping from the raw files to the corresponding tokenized files", + ) + tokenized_data_extension: str = Field( + default=".pbin", description="The file extension for the tokenized data files." + ) + + +class LocalExecutionSettings(BaseModel): + """Settings for running the pipeline locally.""" + + tasks: int = 1 + local_tasks: int = 1 + local_rank_offset: int = 0 + logging_dir: str | None = None + + +class SlurmExecutionSettings(BaseModel): + """Settings for running the pipeline on a Slurm cluster.""" + tasks: int = 1 + time: str = "00:30:00" + partition: str = "default" + cpus_per_task: int = 4 + mem_per_cpu_gb: int = 8 + workers: int = -1 + job_name: str = "filtering_pipeline" + qos: str = "normal" + env_command: str | None = None + condaenv: str | None = None + venv_path: str | None = None + # Allow users to supply any sbatch arg (e.g. nodes, ntasks, gres, account, output, error, gpus-per-task, etc.) + # using either snake_case or dash-case. Primitive values get coerced to strings. + sbatch_args: dict[str, str | int | float | bool] | None = None + max_array_size: int = 1001 + depends_job_id: str | None = None + job_id_position: int = -1 + logging_dir: str | None = None + skip_completed: bool = True + slurm_logs_folder: str | None = None + max_array_launch_parallel: bool = False + stagger_max_array_jobs: int = 0 + run_on_dependency_fail: bool = False + randomize_start_duration: int = 0 + requeue_signals: tuple[str] | None = ("SIGUSR1",) + mail_type: str = "ALL" + mail_user: str | None = None + requeue: bool = True + srun_args: dict[str, str | int | float | bool] | None = None + tasks_per_job: int = 1 + + @model_validator(mode="before") + def _normalize_sbatch(cls, values): # type: ignore[override] + """Normalize sbatch_args only. + + - Accept numeric/bool types and coerce to string + - Fold common top-level keys (output, error, gpus_per_task) into sbatch_args + - Convert snake_case keys to dash-case + """ + from omegaconf import DictConfig as _DictConfig # local import + + sbatch_args = values.get("sbatch_args") or {} + if isinstance(sbatch_args, _DictConfig): + sbatch_args = OmegaConf.to_container(sbatch_args, resolve=True) # type: ignore[arg-type] + if not isinstance(sbatch_args, dict): + raise TypeError(f"sbatch_args must be a mapping if provided (got type {type(sbatch_args)})") + + values["sbatch_args"] = sbatch_args + return values + + +def run_pipeline(args: FilterPipelineBuilder) -> None: + """Runs a datatrove pipeline to filter datasets based on scores. + Args: + args (PipelineArgs): The configuration parameters for the pipeline. + """ + executor = args.build_pipeline_executor() + executor.run() + + +def build_pipeline( + score_path: Path, + tokenized_data_path: Path, + output_folder: Path, + thresholds: dict[str, float], + base_file_prefix: Path = Path(""), + tokenized_data_extension: str = ".pbin", +) -> list[PipelineStep]: + """ + Builds a datatrove pipeline for filtering datasets based on scores. + Args: + score_path (Path): The path to the JSONL file containing scores. + tokenized_data_path (Path): The path for the tokenized data files. + output_folder (Path): The folder where the filtered datasets will be saved. + thresholds (dict[str, float]): A dictionary where keys are score names and values are the + thresholds to filter samples. + hash_to_base_file_mapping_csv (Path): A CSV file mapping base file hashes to their corresponding paths. + base_file_prefix (Path): The prefix path for the base files. + tokenized_data_extension (str): The file extension for the tokenized data files. + Returns: + list[PipelineStep]: A list containing the pipeline steps for filtering datasets. + """ + assert score_path.is_dir(), f"Score path {score_path} must be a directory." + assert output_folder.is_dir(), f"Output folder {output_folder} must be a directory." + assert len(thresholds) > 0, "At least one threshold must be provided." + pipeline: list[PipelineStep] = [ + ScoresParser( + data_folder=str(score_path), + score_keys=list(thresholds.keys()), + tokenized_data_path=tokenized_data_path, + base_file_prefix=base_file_prefix, + tokenized_data_extension=tokenized_data_extension, + ), + DataFiltering( + output_folder=output_folder, + thresholds=thresholds, + tokenized_data_path=tokenized_data_path, + ), + ] + return pipeline + +if __name__ == "__main__": + if len(sys.argv) > 1 or not (yaml_file := os.getenv("FILTER_PIPELINE_YAML_FILE")) or not os.path.isfile(yaml_file): + print( + "This script is intended to be used with a YAML configuration " + "file set via the environment variable `FILTER_PIPELINE_YAML_FILE`.\n" + "If you want to run it without a YAML file, please import from it " + "and use the FilterPipelineBuilder class directly." + ) + exit(1) + args = FilterPipelineBuilder() + run_pipeline(args) diff --git a/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py b/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py new file mode 100644 index 00000000..d7e889aa --- /dev/null +++ b/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py @@ -0,0 +1,98 @@ +import dataclasses +import logging +from pathlib import Path +from typing import Callable + +import numpy as np +from datatrove.data import Document, DocumentsPipeline +from datatrove.pipeline.base import PipelineStep +from numpy.typing import NDArray + +from ml_filter.data_processing.score_based_filtering.step_score_parsing import ScoresParser + +try: + from modalities.dataloader.filter_packed_data import filter_dataset +except ImportError: + logging.error("The filtering pipeline requires the 'modalities' package to be installed.") + exit(1) + + +class DataFiltering(PipelineStep): + """ + A class to filter datasets based on scores and specified thresholds. + This class is designed to be used within a datatrove pipeline. + For a given list of score dictionaries, it filters the corresponding tokenized dataset files + based on the provided thresholds for each score. + The resulting filtered datasets are saved in the specified output folder. + Args: + output_folder (Path): The folder where the filtered datasets will be saved. + thresholds (dict[str, float]): A dictionary where keys are score names and values are the + thresholds to filter samples. + tokenized_data_path (Path): The path for the tokenized data files. + Raises: + AssertionError: If the output folder is not a directory or if no thresholds are provided. + """ + + name = "DataFiltering" + type = "Filter" + _requires_dependencies = [] + + def __init__(self, output_folder: Path, thresholds: dict[str, float], tokenized_data_path: Path = Path("")): + super().__init__() + self._output_folder = output_folder + assert self._output_folder.is_dir(), f"Output folder {self._output_folder} must be a directory." + self._thresholds = thresholds + assert len(self._thresholds) > 0, "At least one threshold must be provided." + self._tokenized_data_path = tokenized_data_path + + def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: + for document in data: + with self.track_time(): + self._filter_document(document) + yield document + + def _filter_document(self, document: Document): + """ + Filters a single, tokenized dataset based on the scores contained in the document. + Args: + document (Document): The document containing scores and the path to the tokenized data file. + Raises: + ValueError: If the document does not contain the required keys or if the tokenized file path is invalid. + """ + document: dict[str, list[dict[str, float]] | str] = dataclasses.asdict(document) + scores: list[dict[str, float]] = document["metadata"][ScoresParser.SCORE_ENTRIES_KEY] + tokenized_file = Path(document["metadata"][ScoresParser.TOKENIZED_FILE_KEY]) + output_path = self._prepare_output_path(tokenized_file) + filter_func = make_filter_func(scores, self._thresholds) + filter_dataset(src_path=tokenized_file, dst_path=output_path, filter_func=filter_func) + + def _prepare_output_path(self, tokenized_file: Path) -> Path: + tokenized_file_rel = tokenized_file.relative_to(self._tokenized_data_path) + output_path = self._output_folder / tokenized_file_rel.with_suffix(".filtered.pbin") + output_path.parent.mkdir(parents=True, exist_ok=True) + return output_path + + +def make_filter_func( + scores: list[dict[str, float]], thresholds: dict[str, float] +) -> Callable[[tuple[int, dict[str, NDArray[np.int_]]]], bool]: + """ + Creates a filter function that checks if the scores of each sample meet the specified thresholds. + Args: + scores (list[dict[str, float]]): A list of dictionaries containing scores for each sample. + thresholds (dict[str, float]): A dictionary where keys are score names and values are the thresholds to + filter samples. + Returns: + Callable[[tuple[int, dict[str, NDArray[np.int_]]]], bool]: A function that takes an item (index and + sample) and returns True if the sample meets the thresholds, otherwise False. + """ + + def filter_func(item: tuple[int, dict[str, NDArray[np.int_]]]) -> bool: + idx, _ = item + score_entry = scores[idx] + for score_key, threshold in thresholds.items(): + if score_entry[score_key] < threshold: + return False + return True + + return filter_func diff --git a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py new file mode 100644 index 00000000..327bfa06 --- /dev/null +++ b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py @@ -0,0 +1,167 @@ +import json +import logging +from pathlib import Path +from typing import Callable, Iterable, Literal + +from datatrove.data import DocumentsPipeline +from datatrove.io import DataFileLike, DataFolderLike +from datatrove.pipeline.readers.base import BaseDiskReader + + +class ScoresParser(BaseDiskReader): + """ + A parser that reads a JSONL file containing scores for samples and maps them to the + corresponding tokenized data files. Each entry in the JSONL file is expected to have + a "document_id" field that contains a base file hash and an index, and the scores + for that sample. + """ + + name = "ScoresParser" + # type = "Parser" + _requires_dependencies = [] + + SCORE_ENTRIES_KEY = "score_entries" + TOKENIZED_FILE_KEY = "tokenized_file" + + def __init__( + self, + data_folder: DataFolderLike, + score_keys: Iterable[str], + tokenized_data_path: Path, + base_file_prefix: Path = Path(""), + tokenized_data_extension: str = ".pbin", + compression: Literal["infer", "gzip", "zstd"] | None = "infer", + paths_file: DataFileLike | None = None, + limit: int = -1, + skip: int = 0, + file_progress: bool = False, + doc_progress: bool = False, + adapter: Callable | None = None, + text_key: str = "text", + id_key: str = "id", + default_metadata: dict | None = None, + recursive: bool = True, + glob_pattern: str | None = None, + shuffle_files: bool = False, + ): + super().__init__( + data_folder=data_folder, + paths_file=paths_file, + limit=limit, + skip=skip, + file_progress=file_progress, + doc_progress=doc_progress, + adapter=adapter, + text_key=text_key, + id_key=id_key, + default_metadata=default_metadata, + recursive=recursive, + glob_pattern=glob_pattern, + shuffle_files=shuffle_files, + ) + self._score_keys = list(score_keys) + assert len(self._score_keys) > 0, "At least one score key must be provided." + self._tokenized_data_path = tokenized_data_path + self._base_file_prefix = base_file_prefix + self._tokenized_data_extension = tokenized_data_extension + self._compression = compression + + def read_file(self, filepath: str) -> DocumentsPipeline: + """ + Turns a given JSONL file into a Document object containing the path to the corresponding tokenized data file + and a list of dictionaries with the scores for each sample in the file. + Args: + filepath: path of the file to read + + Returns: generator of Document + """ + base_file_path_or_name, scores_as_list = self._parse_scores_jsonl_file(filepath) + tokenized_data_path = self._map_to_tokenized_data_path(base_file_path_or_name) + doc_content = { + "text": ".", # Text needs to be non-empty. + self.SCORE_ENTRIES_KEY: scores_as_list, + self.TOKENIZED_FILE_KEY: tokenized_data_path, + } + document = self.get_document_from_dict(doc_content, filepath, 0) + return [document] + + def _parse_scores_jsonl_file( + self, filepath: str + ) -> tuple[str, list[dict[str, float]]]: + """ + Parse a JSONL file containing scores for documents. + + Preserves the original file order, even when duplicate document_ids exist. + Duplicates are disambiguated with numeric suffixes, but order is preserved. + """ + scores_for_document_idx: list[tuple[str, dict[str, float]]] = [] + duplicate_counts: dict[str, int] = {} # track counts per original document_id + processed_count = 0 + + with self.data_folder.open(filepath, "r", compression=self._compression) as f: + for line_number, line in enumerate(f, start=1): + processed_count += 1 + file_data = json.loads(line) + document_id = file_data.get("document_id") + + if any(doc_id == document_id or doc_id.startswith(f"{document_id}_") for doc_id, _ in scores_for_document_idx): + # Generate a new unique ID for duplicates + dup_count = duplicate_counts.get(document_id, 0) + 1 + duplicate_counts[document_id] = dup_count + new_id = f"{document_id}_{dup_count}" + while any(doc_id == new_id for doc_id, _ in scores_for_document_idx): + dup_count += 1 + duplicate_counts[document_id] = dup_count + new_id = f"{document_id}_{dup_count}" + document_id = new_id + + # Append to list to preserve original order + scores_for_document_idx.append( + (document_id, {k: float(file_data[k]) for k in self._score_keys}) + ) + self._verify_unique_ids(filepath, dict(scores_for_document_idx), processed_count) + return filepath, [score_dict for _, score_dict in scores_for_document_idx] + + + def _verify_unique_ids(self, filepath: str, scores_for_document_idx: dict[str, dict], processed_count: int): + """Verify that the number of unique document IDs matches the number of processed (valid) lines. + + Args: + filepath: Path to the scores JSONL file. + scores_for_document_idx: Mapping of document_id to its score dict. + processed_count: Number of lines that contained a valid document_id. + """ + unique_ids = len(scores_for_document_idx) + if unique_ids != processed_count: + raise ValueError( + f"Mismatch in number of samples in scores file {filepath}: unique_ids={unique_ids} processed_lines={processed_count}." + ) + + + def _map_to_tokenized_data_path(self, base_file_path: Path | str) -> Path: + """ + Maps a base file path to the corresponding tokenized data path. + Args: + base_file_path (str): The path of the base file. + Returns: + Path: The path to the tokenized data file. + """ + if isinstance(base_file_path, str): + base_file_path = Path(base_file_path) + + # When prefix is effectively empty ("" or ".") just take the file name. + if str(self._base_file_prefix) in {"", "."}: + base_name = base_file_path.name # ensure we only use the filename portion + base_file_rel = Path(base_name) + else: + # Use relative_to only if possible; otherwise fall back to filename. + try: + base_file_rel = base_file_path.relative_to(self._base_file_prefix) + except Exception: + base_file_rel = Path(base_file_path.name) + + tokenized_rel = base_file_rel.with_suffix(self._tokenized_data_extension) + tokenized_data_path = self._tokenized_data_path / tokenized_rel + if not tokenized_data_path.exists(): + raise FileNotFoundError(f"Tokenized data file {tokenized_data_path} does not exist.") + return tokenized_data_path diff --git a/tests/score_based_filtering/test_filter_pipeline.py b/tests/score_based_filtering/test_filter_pipeline.py new file mode 100644 index 00000000..543bb1ba --- /dev/null +++ b/tests/score_based_filtering/test_filter_pipeline.py @@ -0,0 +1,341 @@ +"""Tests for score-based filtering pipeline. + +Includes: + - Unit test for `make_filter_func` threshold logic. + - Parser test validating duplicate handling. + - Order preservation/content test (basic file existence & size checks). + - Full round-trip tokenize -> filter -> detokenize verification using real packed data format. + +Performance note: A simple tokenizer cache avoids repeatedly reloading the HF tokenizer across helper functions. +""" + +# Standard library imports +import json +import shutil +import tempfile +import unittest +from pathlib import Path +from typing import Any + +# Third-party imports +import numpy as np +from datatrove.executor import LocalPipelineExecutor + +from ml_filter.data_processing.score_based_filtering.step_data_filtering import make_filter_func +from ml_filter.data_processing.score_based_filtering.step_score_parsing import ScoresParser +from ml_filter.data_processing.score_based_filtering.filter_pipeline import build_pipeline + +# --------------------------------------------------------------------------- +# Helper constants & functions +# --------------------------------------------------------------------------- +_TOKENIZER_CACHE: dict[str, Any] = {} + +HEADER_SIZE = 64 # Mimics EmbeddedStreamData.HEADER_SIZE_IN_BYTES (simplified for tests) +DATA_SECTION_LEN_BYTES = 8 +TOKEN_SIZE_DESC_LEN_BYTES = 4 + + +def _write_jsonl(records: list[dict], path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as f: + for r in records: + f.write(json.dumps(r, ensure_ascii=False) + "\n") + + +def _get_tokenizer(name: str): + from transformers import AutoTokenizer + if name not in _TOKENIZER_CACHE: + _TOKENIZER_CACHE[name] = AutoTokenizer.from_pretrained(name) + return _TOKENIZER_CACHE[name] + + +def _tokenize_to_pbin(raw_jsonl: Path, tokenized_file: Path, tokenizer_name: str = "bert-base-multilingual-cased") -> None: + """Tokenizer writer producing a valid packed file using modalities header conventions.""" + import pickle + from modalities.dataloader.create_packed_data import ( + EmbeddedStreamData, + update_data_length_in_pre_allocated_header, + ) + + tokenizer = _get_tokenizer(tokenizer_name) + index_list: list[tuple[int, int]] = [] + tokenized_file.parent.mkdir(parents=True, exist_ok=True) + with raw_jsonl.open("r", encoding="utf-8") as f_in, tokenized_file.open("wb") as f_out: + # Pre-allocate header with zero data length then patch + f_out.write((0).to_bytes(EmbeddedStreamData.DATA_SECTION_LENGTH_IN_BYTES, byteorder="little")) + f_out.write((4).to_bytes(EmbeddedStreamData.TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES, byteorder="little")) + header_bytes_written = ( + EmbeddedStreamData.DATA_SECTION_LENGTH_IN_BYTES + + EmbeddedStreamData.TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES + ) + if header_bytes_written < EmbeddedStreamData.HEADER_SIZE_IN_BYTES: + f_out.write(b"\x00" * (EmbeddedStreamData.HEADER_SIZE_IN_BYTES - header_bytes_written)) + curr_offset = 0 + for line in f_in: + text = json.loads(line)["text"] + if not text.strip(): + continue + enc = tokenizer(text, truncation=True, max_length=None, add_special_tokens=True) + arr = np.array(enc["input_ids"], dtype=np.uint32) + bytes_chunk = arr.astype(" None: + """Create a minimal packed data file with given tokens per sample using modalities conventions.""" + import pickle + from modalities.dataloader.create_packed_data import ( + EmbeddedStreamData, + update_data_length_in_pre_allocated_header, + ) + path.parent.mkdir(parents=True, exist_ok=True) + index_list: list[tuple[int, int]] = [] + with path.open("wb") as f_out: + f_out.write((0).to_bytes(EmbeddedStreamData.DATA_SECTION_LENGTH_IN_BYTES, byteorder="little")) + f_out.write((4).to_bytes(EmbeddedStreamData.TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES, byteorder="little")) + header_written = ( + EmbeddedStreamData.DATA_SECTION_LENGTH_IN_BYTES + + EmbeddedStreamData.TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES + ) + if header_written < EmbeddedStreamData.HEADER_SIZE_IN_BYTES: + f_out.write(b"\x00" * (EmbeddedStreamData.HEADER_SIZE_IN_BYTES - header_written)) + curr_offset = 0 + for sample_tokens in tokens_per_sample: + arr = np.array(sample_tokens, dtype=np.uint32) + chunk = arr.astype(" list[str]: + """Load a packed .pbin file via modalities and detokenize each sample back to text using the tokenizer. + + This relies on the PackedMemMapDatasetBase interface to iterate samples. + """ + from modalities.dataloader.dataset import PackedMemMapDatasetBase + tokenizer = _get_tokenizer(tokenizer_name) + dataset = PackedMemMapDatasetBase(packed_file, sample_key="input_ids", load_index=True) + texts: list[str] = [] + for i in range(len(dataset)): + tokens = dataset[i]["input_ids"].tolist() + # Handle potential special tokens gracefully + decoded = tokenizer.decode(tokens, skip_special_tokens=True).strip() + texts.append(decoded) + return texts + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- +class TestMakeFilterFunc(unittest.TestCase): + def test_filter_func_threshold_logic(self): + scores = [ + {"score_A": 1.0, "score_B": 5.0}, + {"score_A": 2.5, "score_B": 4.9}, + {"score_A": 3.0, "score_B": 10.0}, + ] + thresholds = {"score_A": 2.0, "score_B": 5.0} + f = make_filter_func(scores, thresholds) + # Index 0 fails score_A + self.assertFalse(f((0, {}))) + # Index 1 fails score_B + self.assertFalse(f((1, {}))) + # Index 2 passes both + self.assertTrue(f((2, {}))) + + +class TestScoresParserDuplicates(unittest.TestCase): + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp_dir) + self.scores_dir = Path(self.tmp_dir) / "scores" + self.scores_dir.mkdir(parents=True, exist_ok=True) + self.tokenized_dir = Path(self.tmp_dir) / "tokenized" + self.tokenized_dir.mkdir(parents=True, exist_ok=True) + (self.tokenized_dir / "file1.pbin").write_bytes(b"dummy") + score_lines = [ + '{"document_id": "file1", "score_A": 1.0, "score_B": 6.0}', + '{"document_id": "file1", "score_A": 2.0, "score_B": 7.0}', + '{"document_id": "file2", "score_A": 4.0, "score_B": 9.0}', + ] + (self.scores_dir / "file1.jsonl").write_text("\n".join(score_lines) + "\n", encoding="utf-8") + self.parser = ScoresParser( + data_folder=str(self.scores_dir), + score_keys=["score_A", "score_B"], + tokenized_data_path=self.tokenized_dir, + base_file_prefix=Path(""), + ) + + def test_parsing_handles_duplicate_ids(self): + docs_pipeline = self.parser.read_file("file1.jsonl") + self.assertEqual(len(docs_pipeline), 1) + doc = docs_pipeline[0] + metadata = doc.metadata + score_entries = metadata[ScoresParser.SCORE_ENTRIES_KEY] + # We expect 3 entries: file1, duplicate renamed to file1_1, and file2 + self.assertEqual(len(score_entries), 3) + expected_scores = [ + {"score_A": 1.0, "score_B": 6.0}, + {"score_A": 2.0, "score_B": 7.0}, + {"score_A": 4.0, "score_B": 9.0}, + ] + self.assertEqual(score_entries, expected_scores) + + +class TestFilteringOrderPreservation(unittest.TestCase): + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp_dir) + self.scores_dir = Path(self.tmp_dir) / "scores" + self.scores_dir.mkdir(parents=True, exist_ok=True) + self.tokenized_dir = Path(self.tmp_dir) / "tokenized" + self.tokenized_dir.mkdir(parents=True, exist_ok=True) + self.output_dir = Path(self.tmp_dir) / "outputs" + self.output_dir.mkdir(parents=True, exist_ok=True) + # Packed dataset with 5 samples corresponding to order0..order4 + order_file = self.tokenized_dir / "order.pbin" + order_tokens = [ + [11, 12], + [13, 14, 15], + [16], + [17, 18], + [19, 20, 21], + ] + _write_minimal_packed(order_tokens, order_file) + self.sample_texts = ["alpha", "bravo", "charlie", "delta", "echo"] + scores_lines = [ + '{"document_id": "order0", "score_A": 1.0, "score_B": 5.0}', + '{"document_id": "order1", "score_A": 2.0, "score_B": 5.0}', + '{"document_id": "order2", "score_A": 2.0, "score_B": 2.0}', + '{"document_id": "order3", "score_A": 5.0, "score_B": 5.0}', + '{"document_id": "order4", "score_A": 3.0, "score_B": 7.0}', + ] + (self.scores_dir / "order.jsonl").write_text("\n".join(scores_lines) + "\n", encoding="utf-8") + self.thresholds = {"score_A": 2.0, "score_B": 5.0} + self.expected_passing_indices = [1, 3, 4] + # Use real filter_dataset; we'll inspect size not textual content. + def test_filtered_order_and_content(self): + pipeline = build_pipeline( + score_path=self.scores_dir, + tokenized_data_path=self.tokenized_dir, + output_folder=self.output_dir, + thresholds=self.thresholds, + base_file_prefix=Path(""), + tokenized_data_extension=".pbin", + ) + executor = LocalPipelineExecutor(pipeline=pipeline) + executor.run() + filtered_files = list(self.output_dir.rglob("order.filtered.pbin")) + self.assertEqual(len(filtered_files), 1) + # Basic sanity: filtered file should be non-empty + self.assertGreater(filtered_files[0].stat().st_size, 0, "Filtered file is unexpectedly empty.") + + +class TestTokenizeFilterDetokenizeRoundTrip(unittest.TestCase): + """End-to-end round trip using inlined tokenizer logic and monkeypatched filtering.""" + + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp_dir) + self.raw_dir = Path(self.tmp_dir) / "raw" + self.tokenized_dir = Path(self.tmp_dir) / "tokenized" + self.output_dir = Path(self.tmp_dir) / "outputs" + self.scores_dir = Path(self.tmp_dir) / "scores" + for p in (self.raw_dir, self.tokenized_dir, self.output_dir, self.scores_dir): + p.mkdir(parents=True, exist_ok=True) + + # Samples and scores definitions + self.samples = [ + {"text": "hello world"}, # fail A + {"text": "bonjour le monde"}, # pass + {"text": "hola mundo"}, # pass + {"text": "ciao mondo"}, # fail B + ] + self.raw_jsonl = self.raw_dir / "sample.jsonl" + _write_jsonl(self.samples, self.raw_jsonl) + scores_lines = [ + '{"document_id": "sample0", "score_A": 1.0, "score_B": 6.0}', + '{"document_id": "sample1", "score_A": 2.0, "score_B": 5.0}', + '{"document_id": "sample2", "score_A": 3.0, "score_B": 10.0}', + '{"document_id": "sample3", "score_A": 5.0, "score_B": 2.0}', + ] + (self.scores_dir / "sample.jsonl").write_text("\n".join(scores_lines) + "\n", encoding="utf-8") + + # Tokenize + tokenized_file = self.tokenized_dir / "sample.pbin" + _tokenize_to_pbin(self.raw_jsonl, tokenized_file) + + # Real filter_dataset will be used; we'll parse output later to approximate pass count via index length. + self.expected_texts = [self.samples[1]["text"], self.samples[2]["text"]] + + def test_round_trip_filtering(self): + pipeline = build_pipeline( + score_path=self.scores_dir, + tokenized_data_path=self.tokenized_dir, + output_folder=self.output_dir, + thresholds={"score_A": 2.0, "score_B": 5.0}, + base_file_prefix=Path(""), + tokenized_data_extension=".pbin", + ) + executor = LocalPipelineExecutor(pipeline=pipeline) + executor.run() + filtered_files = list(self.output_dir.rglob("sample.filtered.pbin")) + self.assertEqual(len(filtered_files), 1) + self.assertGreater(filtered_files[0].stat().st_size, 0) + + # Detokenize filtered file and verify the surviving texts correspond exactly to expected_texts + detok_texts = _detokenize_packed(filtered_files[0]) + + # Assert order and content match expected + self.assertEqual(len(detok_texts), len(self.expected_texts)) + # Simple containment & order check (exact match) + self.assertEqual(detok_texts, self.expected_texts, "Detokenized filtered texts do not match expected output") + + +class TestScoresParserOrdering(unittest.TestCase): + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp_dir) + self.scores_dir = Path(self.tmp_dir) / "scores" + self.tokenized_dir = Path(self.tmp_dir) / "tokenized" + self.scores_dir.mkdir(parents=True, exist_ok=True) + self.tokenized_dir.mkdir(parents=True, exist_ok=True) + (self.tokenized_dir / "samples.pbin").write_bytes(b"dummy") + + scores_lines = [ + '{"document_id": "sample1", "score_A": 1.0}', + '{"document_id": "sample2", "score_A": 2.0}', + '{"document_id": "sample10", "score_A": 10.0}', + ] + (self.scores_dir / "samples.jsonl").write_text("\n".join(scores_lines) + "\n", encoding="utf-8") + + self.parser = ScoresParser( + data_folder=str(self.scores_dir), + score_keys=["score_A"], + tokenized_data_path=self.tokenized_dir, + base_file_prefix=Path(""), + ) + + def test_preserves_original_document_order(self): + docs_pipeline = self.parser.read_file("samples.jsonl") + self.assertEqual(len(docs_pipeline), 1) + score_entries = docs_pipeline[0].metadata[ScoresParser.SCORE_ENTRIES_KEY] + expected_scores = [ + {"score_A": 1.0}, + {"score_A": 2.0}, + {"score_A": 10.0}, + ] + self.assertEqual(score_entries, expected_scores) + +if __name__ == "__main__": + unittest.main()