From 971a0691dd2da618899214a96f5ec3ba01ad0b0e Mon Sep 17 00:00:00 2001 From: Timm Ruland Date: Fri, 25 Jul 2025 10:38:30 +0200 Subject: [PATCH 01/10] feat(filtering): Added a datatrove based pipeline for filtering tokenized data using scores. - Included an example configuration file. - Added datatrove and pydantic-settings to requirements. - Note that modalities is also required for the pipeline to work, but it is not included in the requirements file. --- .../example_filter_pipeline_config.yaml | 22 ++ pyproject.toml | 2 + .../score_based_filtering/__init__.py | 0 .../score_based_filtering/filter_pipeline.py | 245 ++++++++++++++++++ .../step_data_filtering.py | 98 +++++++ .../step_score_parsing.py | 127 +++++++++ 6 files changed, 494 insertions(+) create mode 100644 configs/data_processing/example_filter_pipeline_config.yaml create mode 100644 src/ml_filter/data_processing/score_based_filtering/__init__.py create mode 100644 src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py create mode 100644 src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py create mode 100644 src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py diff --git a/configs/data_processing/example_filter_pipeline_config.yaml b/configs/data_processing/example_filter_pipeline_config.yaml new file mode 100644 index 00000000..73156384 --- /dev/null +++ b/configs/data_processing/example_filter_pipeline_config.yaml @@ -0,0 +1,22 @@ +params: + score_path: /path/to/annotations + tokenized_data_path: /path/to/tokenized + output_folder: /path/to/filtered + + thresholds: + score_Gemma_Snowflake: 3.0 + score_Llama_Snowflake: 2.0 + + hash_to_base_file_mapping_csv: path/to/hashes.csv + base_file_prefix: /path/to/raw + tokenized_data_extension: .pbin + +running_on_slurm: false + +local_settings: + tasks: 1 + local_tasks: 1 + local_rank_offset: 0 + logging_dir: null + +slurm_settings: null diff --git a/pyproject.toml b/pyproject.toml index 17f3eeea..508ace55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "tqdm", "datasets", "pydantic", + "pydantic-settings", "transformers @ git+https://github.com/huggingface/transformers.git@v4.49.0-Gemma-3", "click", "click_pathlib", @@ -28,6 +29,7 @@ dependencies = [ "python-dotenv", "jq", "tabulate", + "datatrove", ] [project.optional-dependencies] diff --git a/src/ml_filter/data_processing/score_based_filtering/__init__.py b/src/ml_filter/data_processing/score_based_filtering/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py new file mode 100644 index 00000000..977ab070 --- /dev/null +++ b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py @@ -0,0 +1,245 @@ +from __future__ import annotations + +import csv +import os +import sys +from pathlib import Path + +from datatrove.executor import LocalPipelineExecutor, SlurmPipelineExecutor +from datatrove.pipeline.base import PipelineStep +from pydantic import BaseModel, Field, model_validator +from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict, YamlConfigSettingsSource + +from ml_filter.data_processing.score_based_filtering.step_data_filtering import DataFiltering +from ml_filter.data_processing.score_based_filtering.step_score_parsing import ScoresParser + + +class FilterPipelineBuilder(BaseSettings): + """Configuration parameters and building for the score-based filtering pipeline. + This class defines the settings for running a data filtering pipeline that processes datasets based on scores. + It includes parameters for both local and Slurm execution environments. + The pipeline consists of steps for parsing scores and filtering datasets based on those scores. + + Besides initializing this class directly, it can also be configured using a YAML file or environment variables. + The YAML file can be specified using the `FILTER_PIPELINE_YAML_FILE` environment variable. + If no YAML file is provided, the class will use default settings and environment variables. + """ + + model_config = SettingsConfigDict(env_prefix="filter_pipeline_", env_nested_delimiter="__") + + # Pipeline configuration parameters + params: FilterPipelineParameters + + # Execution parameters + running_on_slurm: bool = False + local_settings: LocalExecutionSettings | None = None + slurm_settings: SlurmExecutionSettings | None = None + + @model_validator(mode="after") + def slurm_vs_local(self): + if self.running_on_slurm and self.local_settings is not None: + raise ValueError("Running on Slurm requires slurm execution settings, not local settings.") + if self.running_on_slurm and self.slurm_settings is None: + self.slurm_settings = SlurmExecutionSettings() + elif not self.running_on_slurm and self.slurm_settings is not None: + raise ValueError("Running locally requires local execution settings, not Slurm settings.") + if not self.running_on_slurm and self.local_settings is None: + self.local_settings = LocalExecutionSettings() + return self + + @model_validator(mode="after") + def set_logging_dir(self): + if self.local_settings is not None and self.local_settings.logging_dir is None: + self.local_settings.logging_dir = str(self.params.output_folder / "logs") + if self.slurm_settings is not None and self.slurm_settings.logging_dir is None: + self.slurm_settings.logging_dir = str(self.params.output_folder / "logs") + return self + + def build_pipeline_executor(self) -> LocalPipelineExecutor | SlurmPipelineExecutor: + """Builds the appropriate pipeline executor based on the execution settings.""" + pipeline = self._build_pipeline() + if self.running_on_slurm: + return SlurmPipelineExecutor(pipeline=pipeline, **self.slurm_settings.model_dump()) + else: + return LocalPipelineExecutor(pipeline=pipeline, **self.local_settings.model_dump()) + + def _build_pipeline(self) -> list[PipelineStep]: + """Builds the pipeline based on the provided configuration.""" + return build_pipeline( + score_path=self.params.score_path, + tokenized_data_path=self.params.tokenized_data_path, + output_folder=self.params.output_folder, + thresholds=self.params.thresholds, + hash_to_base_file_mapping_csv=self.params.hash_to_base_file_mapping_csv, + base_file_prefix=self.params.base_file_prefix, + tokenized_data_extension=self.params.tokenized_data_extension, + ) + + @classmethod + def settings_customise_sources( + cls, + settings_cls: type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> tuple[PydanticBaseSettingsSource, ...]: + return ( + init_settings, + env_settings, + YamlConfigSettingsSource(settings_cls, yaml_file=os.getenv("FILTER_PIPELINE_YAML_FILE")), + dotenv_settings, + file_secret_settings, + ) + + +class FilterPipelineParameters(BaseModel): + """Parameters for the score-based filtering pipeline.""" + + score_path: Path = Field(..., description="The path to the directory containing JSONL files with scores.") + tokenized_data_path: Path = Field(..., description="The path for the tokenized data files.") + output_folder: Path = Field(..., description="The folder where the filtered datasets will be saved.") + thresholds: dict[str, float] = Field( + ..., description="Dictionary where keys are score names and values are thresholds to filter samples." + ) + hash_to_base_file_mapping_csv: Path = Field( + ..., description="CSV file mapping base file hashes to their corresponding paths." + ) + base_file_prefix: Path = Field( + default=Path(""), + description="The prefix path for the raw/base files. This prefix will be removed " + "when mapping from the raw files to the corresponding tokenized files", + ) + tokenized_data_extension: str = Field( + default=".pbin", description="The file extension for the tokenized data files." + ) + + +class LocalExecutionSettings(BaseModel): + """Settings for running the pipeline locally.""" + + tasks: int = 1 + local_tasks: int = 1 + local_rank_offset: int = 0 + logging_dir: str | None = None + + +class SlurmExecutionSettings(BaseModel): + """Settings for running the pipeline on a Slurm cluster.""" + + tasks: int = 1 + time: str = "00:15:00" + partition: str = "default" + account: str | None = None # FIXME is this supported? + cpus_per_task: int = 1 + mem_per_cpu_gb: int = 2 + workers: int = -1 + job_name: str = "data_processing" + qos: str = "normal" + env_command: str | None = None + condaenv: str | None = None + venv_path: str | None = None + sbatch_args: dict[str, str] | None = None + max_array_size: int = 1001 + depends_job_id: str | None = None + job_id_position: int = -1 + # job_id_retriever: Callable | None = None + logging_dir: str | None = None + skip_completed: bool = True + slurm_logs_folder: str | None = None + max_array_launch_parallel: bool = False + stagger_max_array_jobs: int = 0 + run_on_dependency_fail: bool = False + randomize_start_duration: int = 0 + requeue_signals: tuple[str] | None = ("SIGUSR1",) + mail_type: str = "ALL" + mail_user: str | None = None + requeue: bool = True + srun_args: dict[str, str] | None = None + tasks_per_job: int = 1 + + +def run_pipeline(args: FilterPipelineBuilder) -> None: + """Runs a datatrove pipeline to filter datasets based on scores. + Args: + args (PipelineArgs): The configuration parameters for the pipeline. + """ + executor = args.build_pipeline_executor() + executor.run() + + +def build_pipeline( + score_path: Path, + tokenized_data_path: Path, + output_folder: Path, + thresholds: dict[str, float], + hash_to_base_file_mapping_csv: Path, + base_file_prefix: Path = Path(""), + tokenized_data_extension: str = ".pbin", +) -> list[PipelineStep]: + """ + Builds a datatrove pipeline for filtering datasets based on scores. + Args: + score_path (Path): The path to the JSONL file containing scores. + tokenized_data_path (Path): The path for the tokenized data files. + output_folder (Path): The folder where the filtered datasets will be saved. + thresholds (dict[str, float]): A dictionary where keys are score names and values are the + thresholds to filter samples. + hash_to_base_file (dict[str, Path]): A mapping from base file hashes to their corresponding paths. + base_file_prefix (Path): The prefix path for the base files. + tokenized_data_extension (str): The file extension for the tokenized data files. + Returns: + list[PipelineStep]: A list containing the pipeline steps for filtering datasets. + """ + assert score_path.is_dir(), f"Score path {score_path} must be a directory." + assert output_folder.is_dir(), f"Output folder {output_folder} must be a directory." + assert len(thresholds) > 0, "At least one threshold must be provided." + assert ( + hash_to_base_file_mapping_csv.is_file() + ), f"Hash to base file mapping {hash_to_base_file_mapping_csv} must be a file." + hash_to_base_file = read_hash_to_base_file_mapping(hash_to_base_file_mapping_csv) + pipeline: list[PipelineStep] = [ + ScoresParser( + data_folder=str(score_path), + score_keys=list(thresholds.keys()), + hash_to_base_file=hash_to_base_file, + tokenized_data_path=tokenized_data_path, + base_file_prefix=base_file_prefix, + tokenized_data_extension=tokenized_data_extension, + ), + DataFiltering( + output_folder=output_folder, + thresholds=thresholds, + tokenized_data_path=tokenized_data_path, + ), + ] + return pipeline + + +def read_hash_to_base_file_mapping(csv_file: Path) -> dict[str, Path]: + """ + Reads a CSV file containing a mapping from base file hashes to their corresponding paths. + Args: + csv_file (Path): The path to the CSV file. + Returns: + dict[str, Path]: A dictionary mapping base file hashes to their corresponding paths. + """ + hash_to_base_file: dict[str, Path] = {} + with open(csv_file, "r") as f: + reader = csv.DictReader(f) + for row in reader: + hash_to_base_file[row["md5"]] = Path(row["file_path"]) + return hash_to_base_file + + +if __name__ == "__main__": + if len(sys.argv) > 1 or not (yaml_file := os.getenv("FILTER_PIPELINE_YAML_FILE")) or not os.path.isfile(yaml_file): + print( + "This script is intended to be used with a YAML configuration " + "file set via the environment variable `FILTER_PIPELINE_YAML_FILE`.\n" + "If you want to run it without a YAML file, please import from it " + "and use the FilterPipelineBuilder class directly." + ) + exit(1) + args = FilterPipelineBuilder() + run_pipeline(args) diff --git a/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py b/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py new file mode 100644 index 00000000..9f445316 --- /dev/null +++ b/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py @@ -0,0 +1,98 @@ +import dataclasses +import logging +from pathlib import Path +from typing import Callable + +import numpy as np +from datatrove.data import Document, DocumentsPipeline +from datatrove.pipeline.base import PipelineStep +from numpy.typing import NDArray + +from ml_filter.data_processing.score_based_filtering.step_score_parsing import ScoresParser + +try: + from modalities.dataloader.filter_packed_data import filter_dataset +except ImportError: + logging.error("The filtering pipeline requires the 'modalities' package to be installed.") + exit(1) + + +class DataFiltering(PipelineStep): + """ + A class to filter datasets based on scores and specified thresholds. + This class is designed to be used within a datatrove pipeline. + For a given list of score dictionaries, it filters the corresponding tokenized dataset files + based on the provided thresholds for each score. + The resulting filtered datasets are saved in the specified output folder. + Args: + output_folder (Path): The folder where the filtered datasets will be saved. + thresholds (dict[str, float]): A dictionary where keys are score names and values are the + thresholds to filter samples. + tokenized_data_path (Path): The path for the tokenized data files. + Raises: + AssertionError: If the output folder is not a directory or if no thresholds are provided. + """ + + name = "DataFiltering" + type = "Filter" + _requires_dependencies = [] + + def __init__(self, output_folder: Path, thresholds: dict[str, float], tokenized_data_path: Path = Path("")): + super().__init__() + self._output_folder = output_folder + assert self._output_folder.is_dir(), f"Output folder {self._output_folder} must be a directory." + self._thresholds = thresholds + assert len(self._thresholds) > 0, "At least one threshold must be provided." + self._tokenized_data_path = tokenized_data_path + + def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: + for document in data: + with self.track_time(): + self._filter_document(document) + yield document + + def _filter_document(self, document: Document): + """ + Filters a single, tokenized dataset based on the scores contained in the document. + Args: + document (Document): The document containing scores and the path to the tokenized data file. + Raises: + ValueError: If the document does not contain the required keys or if the tokenized file path is invalid. + """ + document: dict[str, list[dict[str, float]] | str] = dataclasses.asdict(document) + scores: list[dict[str, float]] = document["metadata"][ScoresParser.SCORE_ENTRIES_KEY] + tokenized_file = Path(document["metadata"][ScoresParser.TOKENIZED_FILE_KEY]) + output_path = self._prepare_output_path(tokenized_file) + filter_func = make_filter_func(scores, self._thresholds) + filter_dataset(src_path=tokenized_file, dst_path=output_path, filter_func=filter_func, sample_key="input_ids") + + def _prepare_output_path(self, tokenized_file: Path) -> Path: + tokenized_file_rel = tokenized_file.relative_to(self._tokenized_data_path) + output_path = self._output_folder / tokenized_file_rel.with_suffix(".filtered.pbin") + output_path.parent.mkdir(parents=True, exist_ok=True) + return output_path + + +def make_filter_func( + scores: list[dict[str, float]], thresholds: dict[str, float] +) -> Callable[[tuple[int, dict[str, NDArray[np.int_]]]], bool]: + """ + Creates a filter function that checks if the scores of each sample meet the specified thresholds. + Args: + scores (list[dict[str, float]]): A list of dictionaries containing scores for each sample. + thresholds (dict[str, float]): A dictionary where keys are score names and values are the thresholds to + filter samples. + Returns: + Callable[[tuple[int, dict[str, NDArray[np.int_]]]], bool]: A function that takes an item (index and + sample) and returns True if the sample meets the thresholds, otherwise False. + """ + + def filter_func(item: tuple[int, dict[str, NDArray[np.int_]]]) -> bool: + idx, _ = item + score_entry = scores[idx] + for score_key, threshold in thresholds.items(): + if score_entry[score_key] < threshold: + return False + return True + + return filter_func diff --git a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py new file mode 100644 index 00000000..f1ee14a8 --- /dev/null +++ b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py @@ -0,0 +1,127 @@ +import json +from pathlib import Path +from typing import Callable, Iterable, Literal + +from datatrove.data import DocumentsPipeline +from datatrove.io import DataFileLike, DataFolderLike +from datatrove.pipeline.readers.base import BaseDiskReader + + +class ScoresParser(BaseDiskReader): + """ + A parser that reads a JSONL file containing scores for samples and maps them to the + corresponding tokenized data files. Each entry in the JSONL file is expected to have + a "document_id" field that contains a base file hash and an index, and the scores + for that sample. + """ + + name = "ScoresParser" + # type = "Parser" + _requires_dependencies = [] + + SCORE_ENTRIES_KEY = "score_entries" + TOKENIZED_FILE_KEY = "tokenized_file" + + def __init__( + self, + data_folder: DataFolderLike, + score_keys: Iterable[str], + hash_to_base_file: dict[str, Path], + tokenized_data_path: Path, + base_file_prefix: Path = Path(""), + tokenized_data_extension: str = ".pbin", + compression: Literal["infer", "gzip", "zstd"] | None = "infer", + paths_file: DataFileLike | None = None, + limit: int = -1, + skip: int = 0, + file_progress: bool = False, + doc_progress: bool = False, + adapter: Callable | None = None, + text_key: str = "text", + id_key: str = "id", + default_metadata: dict | None = None, + recursive: bool = True, + glob_pattern: str | None = None, + shuffle_files: bool = False, + ): + super().__init__( + data_folder=data_folder, + paths_file=paths_file, + limit=limit, + skip=skip, + file_progress=file_progress, + doc_progress=doc_progress, + adapter=adapter, + text_key=text_key, + id_key=id_key, + default_metadata=default_metadata, + recursive=recursive, + glob_pattern=glob_pattern, + shuffle_files=shuffle_files, + ) + self._score_keys = list(score_keys) + assert len(self._score_keys) > 0, "At least one score key must be provided." + self._hash_to_base_file = hash_to_base_file + self._tokenized_data_path = tokenized_data_path + self._base_file_prefix = base_file_prefix + self._tokenized_data_extension = tokenized_data_extension + self._compression = compression + + def read_file(self, filepath: str) -> DocumentsPipeline: + """ + Turns a given JSONL file into a Document object containing the path to the corresponding tokenized data file + and a list of dictionaries with the scores for each sample in the file. + Args: + filepath: path of the file to read + + Returns: generator of Document + """ + base_file_hash, scores_as_list = self._parse_scores_jsonl_file(filepath) + tokenized_data_path = self._map_to_tokenized_data_path(base_file_hash) + if not tokenized_data_path.exists(): + raise FileNotFoundError(f"Tokenized data file {tokenized_data_path} does not exist.") + doc_content = { + "text": ".", # Text needs to be non-empty. + self.SCORE_ENTRIES_KEY: scores_as_list, + self.TOKENIZED_FILE_KEY: tokenized_data_path, + } + document = self.get_document_from_dict(doc_content, filepath, 0) + return [document] + + def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, float]]]: + scores_for_idx: dict[int, dict[str, float]] = {} + hashes: set[str] = set() + with self.data_folder.open(filepath, "r", compression=self._compression) as f: + for line in f: + file_data = json.loads(line) + base_file_hash, document_idx = file_data["document_id"].rsplit("_") + scores_for_idx[int(document_idx)] = {k: file_data[k] for k in self._score_keys} + hashes.add(base_file_hash) + self._verify_file_format(scores_for_idx, hashes) + scores_as_list = list(map(lambda x: x[1], sorted(scores_for_idx.items(), key=lambda x: x[0]))) + base_file_hash = next(iter(hashes)) + return base_file_hash, scores_as_list + + def _verify_file_format(self, scores_for_idx: dict[int, dict[str, float]], hashes: set[str]): + assert len(hashes) == 1, "All entries in the score file must refer to the same base file." + assert min(scores_for_idx.keys()) == 0 and max(scores_for_idx.keys()) + 1 == len( + scores_for_idx + ), "All indices in the score file must be continuous." + + def _map_to_tokenized_data_path(self, base_file_hash: str) -> Path: + """ + Maps a base file hash to the corresponding tokenized data path. + Args: + base_file_hash (str): The hash of the base file. + Returns: + Path: The path to the tokenized data file. + """ + if base_file_hash not in self._hash_to_base_file: + raise ValueError(f"Base file hash {base_file_hash} not found in the provided hash mapping.") + base_file = self._hash_to_base_file[base_file_hash] + base_file_rel = base_file.relative_to(self._base_file_prefix) + tokenized_rel = base_file_rel.with_suffix(self._tokenized_data_extension) + tokenized_data_path = self._tokenized_data_path / tokenized_rel + if not tokenized_data_path.exists(): + raise FileNotFoundError(f"Tokenized data file {tokenized_data_path} does not exist.") + return tokenized_data_path From 81aafa82579ead0c61f453ed85c52fabf2620a64 Mon Sep 17 00:00:00 2001 From: BlueCrescent <7198877+BlueCrescent@users.noreply.github.com> Date: Fri, 25 Jul 2025 10:43:29 +0200 Subject: [PATCH 02/10] chore(filtering): More robust doc id parsing. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../data_processing/score_based_filtering/step_score_parsing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py index f1ee14a8..13298e44 100644 --- a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py +++ b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py @@ -94,7 +94,7 @@ def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, f with self.data_folder.open(filepath, "r", compression=self._compression) as f: for line in f: file_data = json.loads(line) - base_file_hash, document_idx = file_data["document_id"].rsplit("_") + base_file_hash, document_idx = file_data["document_id"].rsplit("_", 1) scores_for_idx[int(document_idx)] = {k: file_data[k] for k in self._score_keys} hashes.add(base_file_hash) self._verify_file_format(scores_for_idx, hashes) From b1d1a46fc74f94a7b19ad8454027391616d9d6f2 Mon Sep 17 00:00:00 2001 From: Timm Ruland Date: Fri, 25 Jul 2025 10:50:27 +0200 Subject: [PATCH 03/10] fix(filtering): Removed duplicate file exists check. --- .../data_processing/score_based_filtering/step_score_parsing.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py index 13298e44..3db1ae8f 100644 --- a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py +++ b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py @@ -78,8 +78,6 @@ def read_file(self, filepath: str) -> DocumentsPipeline: """ base_file_hash, scores_as_list = self._parse_scores_jsonl_file(filepath) tokenized_data_path = self._map_to_tokenized_data_path(base_file_hash) - if not tokenized_data_path.exists(): - raise FileNotFoundError(f"Tokenized data file {tokenized_data_path} does not exist.") doc_content = { "text": ".", # Text needs to be non-empty. self.SCORE_ENTRIES_KEY: scores_as_list, From af891823cb557e02bb9a285a3791bdb87893f7db Mon Sep 17 00:00:00 2001 From: BlueCrescent <7198877+BlueCrescent@users.noreply.github.com> Date: Fri, 25 Jul 2025 10:52:01 +0200 Subject: [PATCH 04/10] fix(filtering): fixed docstring Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../data_processing/score_based_filtering/filter_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py index 977ab070..ec1e5410 100644 --- a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py +++ b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py @@ -185,7 +185,7 @@ def build_pipeline( output_folder (Path): The folder where the filtered datasets will be saved. thresholds (dict[str, float]): A dictionary where keys are score names and values are the thresholds to filter samples. - hash_to_base_file (dict[str, Path]): A mapping from base file hashes to their corresponding paths. + hash_to_base_file_mapping_csv (Path): A CSV file mapping base file hashes to their corresponding paths. base_file_prefix (Path): The prefix path for the base files. tokenized_data_extension (str): The file extension for the tokenized data files. Returns: From 22dddeb002af5dbbbf89cbf35c81336a248c07cf Mon Sep 17 00:00:00 2001 From: alex-jude Date: Wed, 29 Oct 2025 11:57:12 +0100 Subject: [PATCH 05/10] refactor: removed reliance on file hashes in the score-based filtering pipeline and adapted the codebase for new changes from main --- .../example_filter_pipeline_config.yaml | 9 +- .../score_based_filtering/filter_pipeline.py | 28 +----- .../step_data_filtering.py | 2 +- .../step_score_parsing.py | 92 +++++++++++++------ 4 files changed, 70 insertions(+), 61 deletions(-) diff --git a/configs/data_processing/example_filter_pipeline_config.yaml b/configs/data_processing/example_filter_pipeline_config.yaml index 73156384..2b09417b 100644 --- a/configs/data_processing/example_filter_pipeline_config.yaml +++ b/configs/data_processing/example_filter_pipeline_config.yaml @@ -1,14 +1,13 @@ params: - score_path: /path/to/annotations - tokenized_data_path: /path/to/tokenized - output_folder: /path/to/filtered + score_path: /raid/s3/opengptx/jude/repos/ml_filter/data/filtering_folder/annotations + tokenized_data_path: /raid/s3/opengptx/jude/repos/ml_filter/data/filtering_folder/tokenized + output_folder: /raid/s3/opengptx/jude/repos/ml_filter/data/filtering_folder/outputs thresholds: score_Gemma_Snowflake: 3.0 score_Llama_Snowflake: 2.0 - hash_to_base_file_mapping_csv: path/to/hashes.csv - base_file_prefix: /path/to/raw + base_file_prefix: /raid/s3/opengptx/jude/repos/ml_filter/data/filtering_folder/annotations tokenized_data_extension: .pbin running_on_slurm: false diff --git a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py index ec1e5410..4b98fafa 100644 --- a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py +++ b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py @@ -70,7 +70,6 @@ def _build_pipeline(self) -> list[PipelineStep]: tokenized_data_path=self.params.tokenized_data_path, output_folder=self.params.output_folder, thresholds=self.params.thresholds, - hash_to_base_file_mapping_csv=self.params.hash_to_base_file_mapping_csv, base_file_prefix=self.params.base_file_prefix, tokenized_data_extension=self.params.tokenized_data_extension, ) @@ -102,9 +101,6 @@ class FilterPipelineParameters(BaseModel): thresholds: dict[str, float] = Field( ..., description="Dictionary where keys are score names and values are thresholds to filter samples." ) - hash_to_base_file_mapping_csv: Path = Field( - ..., description="CSV file mapping base file hashes to their corresponding paths." - ) base_file_prefix: Path = Field( default=Path(""), description="The prefix path for the raw/base files. This prefix will be removed " @@ -173,7 +169,6 @@ def build_pipeline( tokenized_data_path: Path, output_folder: Path, thresholds: dict[str, float], - hash_to_base_file_mapping_csv: Path, base_file_prefix: Path = Path(""), tokenized_data_extension: str = ".pbin", ) -> list[PipelineStep]: @@ -194,15 +189,10 @@ def build_pipeline( assert score_path.is_dir(), f"Score path {score_path} must be a directory." assert output_folder.is_dir(), f"Output folder {output_folder} must be a directory." assert len(thresholds) > 0, "At least one threshold must be provided." - assert ( - hash_to_base_file_mapping_csv.is_file() - ), f"Hash to base file mapping {hash_to_base_file_mapping_csv} must be a file." - hash_to_base_file = read_hash_to_base_file_mapping(hash_to_base_file_mapping_csv) pipeline: list[PipelineStep] = [ ScoresParser( data_folder=str(score_path), score_keys=list(thresholds.keys()), - hash_to_base_file=hash_to_base_file, tokenized_data_path=tokenized_data_path, base_file_prefix=base_file_prefix, tokenized_data_extension=tokenized_data_extension, @@ -215,24 +205,8 @@ def build_pipeline( ] return pipeline - -def read_hash_to_base_file_mapping(csv_file: Path) -> dict[str, Path]: - """ - Reads a CSV file containing a mapping from base file hashes to their corresponding paths. - Args: - csv_file (Path): The path to the CSV file. - Returns: - dict[str, Path]: A dictionary mapping base file hashes to their corresponding paths. - """ - hash_to_base_file: dict[str, Path] = {} - with open(csv_file, "r") as f: - reader = csv.DictReader(f) - for row in reader: - hash_to_base_file[row["md5"]] = Path(row["file_path"]) - return hash_to_base_file - - if __name__ == "__main__": + os.environ["FILTER_PIPELINE_YAML_FILE"] = str("/raid/s3/opengptx/jude/repos/ml_filter/ml_filter/configs/data_processing/example_filter_pipeline_config.yaml") if len(sys.argv) > 1 or not (yaml_file := os.getenv("FILTER_PIPELINE_YAML_FILE")) or not os.path.isfile(yaml_file): print( "This script is intended to be used with a YAML configuration " diff --git a/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py b/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py index 9f445316..d7e889aa 100644 --- a/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py +++ b/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py @@ -64,7 +64,7 @@ def _filter_document(self, document: Document): tokenized_file = Path(document["metadata"][ScoresParser.TOKENIZED_FILE_KEY]) output_path = self._prepare_output_path(tokenized_file) filter_func = make_filter_func(scores, self._thresholds) - filter_dataset(src_path=tokenized_file, dst_path=output_path, filter_func=filter_func, sample_key="input_ids") + filter_dataset(src_path=tokenized_file, dst_path=output_path, filter_func=filter_func) def _prepare_output_path(self, tokenized_file: Path) -> Path: tokenized_file_rel = tokenized_file.relative_to(self._tokenized_data_path) diff --git a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py index 3db1ae8f..5ddfeec1 100644 --- a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py +++ b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py @@ -1,4 +1,5 @@ import json +import logging from pathlib import Path from typing import Callable, Iterable, Literal @@ -26,7 +27,6 @@ def __init__( self, data_folder: DataFolderLike, score_keys: Iterable[str], - hash_to_base_file: dict[str, Path], tokenized_data_path: Path, base_file_prefix: Path = Path(""), tokenized_data_extension: str = ".pbin", @@ -61,7 +61,6 @@ def __init__( ) self._score_keys = list(score_keys) assert len(self._score_keys) > 0, "At least one score key must be provided." - self._hash_to_base_file = hash_to_base_file self._tokenized_data_path = tokenized_data_path self._base_file_prefix = base_file_prefix self._tokenized_data_extension = tokenized_data_extension @@ -76,8 +75,8 @@ def read_file(self, filepath: str) -> DocumentsPipeline: Returns: generator of Document """ - base_file_hash, scores_as_list = self._parse_scores_jsonl_file(filepath) - tokenized_data_path = self._map_to_tokenized_data_path(base_file_hash) + base_file_path_or_name, scores_as_list = self._parse_scores_jsonl_file(filepath) + tokenized_data_path = self._map_to_tokenized_data_path(base_file_path_or_name) doc_content = { "text": ".", # Text needs to be non-empty. self.SCORE_ENTRIES_KEY: scores_as_list, @@ -87,37 +86,74 @@ def read_file(self, filepath: str) -> DocumentsPipeline: return [document] def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, float]]]: - scores_for_idx: dict[int, dict[str, float]] = {} - hashes: set[str] = set() + scores_for_document_idx: dict[str, dict[str, float]] = {} + processed_count = 0 + duplicate_counts: dict[str, int] = {} # track counts per original document_id + with self.data_folder.open(filepath, "r", compression=self._compression) as f: - for line in f: + for line_number, line in enumerate(f, start=1): + processed_count += 1 file_data = json.loads(line) - base_file_hash, document_idx = file_data["document_id"].rsplit("_", 1) - scores_for_idx[int(document_idx)] = {k: file_data[k] for k in self._score_keys} - hashes.add(base_file_hash) - self._verify_file_format(scores_for_idx, hashes) - scores_as_list = list(map(lambda x: x[1], sorted(scores_for_idx.items(), key=lambda x: x[0]))) - base_file_hash = next(iter(hashes)) - return base_file_hash, scores_as_list - - def _verify_file_format(self, scores_for_idx: dict[int, dict[str, float]], hashes: set[str]): - assert len(hashes) == 1, "All entries in the score file must refer to the same base file." - assert min(scores_for_idx.keys()) == 0 and max(scores_for_idx.keys()) + 1 == len( - scores_for_idx - ), "All indices in the score file must be continuous." - - def _map_to_tokenized_data_path(self, base_file_hash: str) -> Path: + document_id = file_data.get("document_id") + + if document_id in scores_for_document_idx: + # Generate a new unique ID with a numeric suffix to disambiguate duplicates. + dup_count = duplicate_counts.get(document_id, 0) + 1 + duplicate_counts[document_id] = dup_count + # Use underscore + count; ensure no collision with an existing (unlikely but safe guard if previous had suffix already) + new_id = f"{document_id}_{dup_count}" + while new_id in scores_for_document_idx: + dup_count += 1 + duplicate_counts[document_id] = dup_count + new_id = f"{document_id}_{dup_count}" + print( + f"Duplicate document_id '{document_id}' encountered at line {line_number} in {filepath}. Renamed to '{new_id}'." + ) + document_id = new_id + + scores_for_document_idx[document_id] = {k: float(file_data[k]) for k in self._score_keys} + + self._verify_unique_ids(filepath, scores_for_document_idx, processed_count) + scores_as_list = [scores for _, scores in sorted(scores_for_document_idx.items(), key=lambda x: x[0])] + return f.name, scores_as_list + + def _verify_unique_ids(self, filepath: str, scores_for_document_idx: dict[str, dict], processed_count: int): + """Verify that the number of unique document IDs matches the number of processed (valid) lines. + + Args: + filepath: Path to the scores JSONL file. + scores_for_document_idx: Mapping of document_id to its score dict. + processed_count: Number of lines that contained a valid document_id. + """ + unique_ids = len(scores_for_document_idx) + if unique_ids != processed_count: + raise ValueError( + f"Mismatch in number of samples in scores file {filepath}: unique_ids={unique_ids} processed_lines={processed_count}." + ) + + + def _map_to_tokenized_data_path(self, base_file_path: Path | str) -> Path: """ - Maps a base file hash to the corresponding tokenized data path. + Maps a base file path to the corresponding tokenized data path. Args: - base_file_hash (str): The hash of the base file. + base_file_path (str): The path of the base file. Returns: Path: The path to the tokenized data file. """ - if base_file_hash not in self._hash_to_base_file: - raise ValueError(f"Base file hash {base_file_hash} not found in the provided hash mapping.") - base_file = self._hash_to_base_file[base_file_hash] - base_file_rel = base_file.relative_to(self._base_file_prefix) + if isinstance(base_file_path, str): + base_file_path = Path(base_file_path) + + # When prefix is effectively empty ("" or ".") just take the file name. + if str(self._base_file_prefix) in {"", "."}: + base_name = base_file_path.name # ensure we only use the filename portion + base_file_rel = Path(base_name) + else: + # Use relative_to only if possible; otherwise fall back to filename. + try: + base_file_rel = base_file_path.relative_to(self._base_file_prefix) + except Exception: + base_file_rel = Path(base_file_path.name) + tokenized_rel = base_file_rel.with_suffix(self._tokenized_data_extension) tokenized_data_path = self._tokenized_data_path / tokenized_rel if not tokenized_data_path.exists(): From e2d02f282abed379929341463140ccdff2f89ed9 Mon Sep 17 00:00:00 2001 From: alex-jude Date: Wed, 29 Oct 2025 11:57:42 +0100 Subject: [PATCH 06/10] test: add comprehensive tests for score-based filtering pipeline functionality --- .../test_filter_pipeline.py | 320 ++++++++++++++++++ 1 file changed, 320 insertions(+) create mode 100644 tests/score_based_filtering/test_filter_pipeline.py diff --git a/tests/score_based_filtering/test_filter_pipeline.py b/tests/score_based_filtering/test_filter_pipeline.py new file mode 100644 index 00000000..1a8fac9d --- /dev/null +++ b/tests/score_based_filtering/test_filter_pipeline.py @@ -0,0 +1,320 @@ +"""Tests for score-based filtering pipeline. + +Includes: + - Unit test for `make_filter_func` threshold logic. + - Parser test validating duplicate handling. + - Order preservation/content test (basic file existence & size checks). + - Full round-trip tokenize -> filter -> detokenize verification using real packed data format. + +Performance note: A simple tokenizer cache avoids repeatedly reloading the HF tokenizer across helper functions. +""" + +# Standard library imports +import json +import os +import sys +import shutil +import tempfile +import unittest +from pathlib import Path +from typing import List + +# Third-party imports +import numpy as np +from datatrove.executor import LocalPipelineExecutor + +# Local package imports: ensure src path is available. +_SRC_PATH = Path(__file__).resolve().parents[3] / "ml_filter" / "src" +if _SRC_PATH.exists() and str(_SRC_PATH) not in sys.path: + sys.path.insert(0, str(_SRC_PATH)) + +from ml_filter.data_processing.score_based_filtering.step_data_filtering import make_filter_func +from ml_filter.data_processing.score_based_filtering.step_score_parsing import ScoresParser +from ml_filter.data_processing.score_based_filtering.filter_pipeline import build_pipeline + +# --------------------------------------------------------------------------- +# Helper constants & functions +# --------------------------------------------------------------------------- + +HEADER_SIZE = 64 # Mimics EmbeddedStreamData.HEADER_SIZE_IN_BYTES (simplified for tests) +DATA_SECTION_LEN_BYTES = 8 +TOKEN_SIZE_DESC_LEN_BYTES = 4 + + +def _write_jsonl(records: list[dict], path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as f: + for r in records: + f.write(json.dumps(r, ensure_ascii=False) + "\n") + + +from typing import Any +_TOKENIZER_CACHE: dict[str, Any] = {} + + +def _get_tokenizer(name: str): + from transformers import AutoTokenizer + if name not in _TOKENIZER_CACHE: + _TOKENIZER_CACHE[name] = AutoTokenizer.from_pretrained(name) + return _TOKENIZER_CACHE[name] + + +def _tokenize_to_pbin(raw_jsonl: Path, tokenized_file: Path, tokenizer_name: str = "bert-base-multilingual-cased") -> None: + """Tokenizer writer producing a valid packed file using modalities header conventions.""" + import pickle + from modalities.dataloader.create_packed_data import ( + EmbeddedStreamData, + update_data_length_in_pre_allocated_header, + ) + + tokenizer = _get_tokenizer(tokenizer_name) + index_list: list[tuple[int, int]] = [] + tokenized_file.parent.mkdir(parents=True, exist_ok=True) + with raw_jsonl.open("r", encoding="utf-8") as f_in, tokenized_file.open("wb") as f_out: + # Pre-allocate header with zero data length then patch + f_out.write((0).to_bytes(EmbeddedStreamData.DATA_SECTION_LENGTH_IN_BYTES, byteorder="little")) + f_out.write((4).to_bytes(EmbeddedStreamData.TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES, byteorder="little")) + header_bytes_written = ( + EmbeddedStreamData.DATA_SECTION_LENGTH_IN_BYTES + + EmbeddedStreamData.TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES + ) + if header_bytes_written < EmbeddedStreamData.HEADER_SIZE_IN_BYTES: + f_out.write(b"\x00" * (EmbeddedStreamData.HEADER_SIZE_IN_BYTES - header_bytes_written)) + curr_offset = 0 + for line in f_in: + text = json.loads(line)["text"] + if not text.strip(): + continue + enc = tokenizer(text, truncation=True, max_length=None, add_special_tokens=True) + arr = np.array(enc["input_ids"], dtype=np.uint32) + bytes_chunk = arr.astype(" None: + """Create a minimal packed data file with given tokens per sample using modalities conventions.""" + import pickle + from modalities.dataloader.create_packed_data import ( + EmbeddedStreamData, + update_data_length_in_pre_allocated_header, + ) + path.parent.mkdir(parents=True, exist_ok=True) + index_list: list[tuple[int, int]] = [] + with path.open("wb") as f_out: + f_out.write((0).to_bytes(EmbeddedStreamData.DATA_SECTION_LENGTH_IN_BYTES, byteorder="little")) + f_out.write((4).to_bytes(EmbeddedStreamData.TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES, byteorder="little")) + header_written = ( + EmbeddedStreamData.DATA_SECTION_LENGTH_IN_BYTES + + EmbeddedStreamData.TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES + ) + if header_written < EmbeddedStreamData.HEADER_SIZE_IN_BYTES: + f_out.write(b"\x00" * (EmbeddedStreamData.HEADER_SIZE_IN_BYTES - header_written)) + curr_offset = 0 + for sample_tokens in tokens_per_sample: + arr = np.array(sample_tokens, dtype=np.uint32) + chunk = arr.astype(" list[str]: + """Load a packed .pbin file via modalities and detokenize each sample back to text using the tokenizer. + + This relies on the PackedMemMapDatasetBase interface to iterate samples. + """ + from modalities.dataloader.dataset import PackedMemMapDatasetBase + tokenizer = _get_tokenizer(tokenizer_name) + dataset = PackedMemMapDatasetBase(packed_file, sample_key="input_ids", load_index=True) + texts: list[str] = [] + for i in range(len(dataset)): + tokens = dataset[i]["input_ids"].tolist() + # Handle potential special tokens gracefully + decoded = tokenizer.decode(tokens, skip_special_tokens=True).strip() + texts.append(decoded) + return texts + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +# We'll monkeypatch filter_dataset used inside DataFiltering + +class TestMakeFilterFunc(unittest.TestCase): + def test_filter_func_threshold_logic(self): + scores = [ + {"score_A": 1.0, "score_B": 5.0}, + {"score_A": 2.5, "score_B": 4.9}, + {"score_A": 3.0, "score_B": 10.0}, + ] + thresholds = {"score_A": 2.0, "score_B": 5.0} + f = make_filter_func(scores, thresholds) + # Index 0 fails score_A + self.assertFalse(f((0, {}))) + # Index 1 fails score_B + self.assertFalse(f((1, {}))) + # Index 2 passes both + self.assertTrue(f((2, {}))) + + +class TestScoresParserDuplicates(unittest.TestCase): + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp_dir) + self.scores_dir = Path(self.tmp_dir) / "scores" + self.scores_dir.mkdir(parents=True, exist_ok=True) + self.tokenized_dir = Path(self.tmp_dir) / "tokenized" + self.tokenized_dir.mkdir(parents=True, exist_ok=True) + (self.tokenized_dir / "file1.pbin").write_bytes(b"dummy") + score_lines = [ + '{"document_id": "file1", "score_A": 1.0, "score_B": 6.0}', + '{"document_id": "file1", "score_A": 2.0, "score_B": 7.0}', + '{"document_id": "file2", "score_A": 4.0, "score_B": 9.0}', + ] + (self.scores_dir / "file1.jsonl").write_text("\n".join(score_lines) + "\n", encoding="utf-8") + self.parser = ScoresParser( + data_folder=str(self.scores_dir), + score_keys=["score_A", "score_B"], + tokenized_data_path=self.tokenized_dir, + base_file_prefix=Path(""), + ) + + def test_parsing_handles_duplicate_ids(self): + docs_pipeline = self.parser.read_file("file1.jsonl") + self.assertEqual(len(docs_pipeline), 1) + doc = docs_pipeline[0] + metadata = doc.metadata + score_entries = metadata[ScoresParser.SCORE_ENTRIES_KEY] + # We expect 3 entries: file1, duplicate renamed to file1_1, and file2 + self.assertEqual(len(score_entries), 3) + expected_scores = [ + {"score_A": 1.0, "score_B": 6.0}, + {"score_A": 2.0, "score_B": 7.0}, + {"score_A": 4.0, "score_B": 9.0}, + ] + self.assertEqual(score_entries, expected_scores) + + + +class TestFilteringOrderPreservation(unittest.TestCase): + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp_dir) + self.scores_dir = Path(self.tmp_dir) / "scores" + self.scores_dir.mkdir(parents=True, exist_ok=True) + self.tokenized_dir = Path(self.tmp_dir) / "tokenized" + self.tokenized_dir.mkdir(parents=True, exist_ok=True) + self.output_dir = Path(self.tmp_dir) / "outputs" + self.output_dir.mkdir(parents=True, exist_ok=True) + # Packed dataset with 5 samples corresponding to order0..order4 + order_file = self.tokenized_dir / "order.pbin" + order_tokens = [ + [11, 12], + [13, 14, 15], + [16], + [17, 18], + [19, 20, 21], + ] + _write_minimal_packed(order_tokens, order_file) + self.sample_texts = ["alpha", "bravo", "charlie", "delta", "echo"] + scores_lines = [ + '{"document_id": "order0", "score_A": 1.0, "score_B": 5.0}', + '{"document_id": "order1", "score_A": 2.0, "score_B": 5.0}', + '{"document_id": "order2", "score_A": 2.0, "score_B": 2.0}', + '{"document_id": "order3", "score_A": 5.0, "score_B": 5.0}', + '{"document_id": "order4", "score_A": 3.0, "score_B": 7.0}', + ] + (self.scores_dir / "order.jsonl").write_text("\n".join(scores_lines) + "\n", encoding="utf-8") + self.thresholds = {"score_A": 2.0, "score_B": 5.0} + self.expected_passing_indices = [1, 3, 4] + # Use real filter_dataset; we'll inspect size not textual content. + def test_filtered_order_and_content(self): + pipeline = build_pipeline( + score_path=self.scores_dir, + tokenized_data_path=self.tokenized_dir, + output_folder=self.output_dir, + thresholds=self.thresholds, + base_file_prefix=Path(""), + tokenized_data_extension=".pbin", + ) + executor = LocalPipelineExecutor(pipeline=pipeline) + executor.run() + filtered_files = list(self.output_dir.rglob("order.filtered.pbin")) + self.assertEqual(len(filtered_files), 1) + # Basic sanity: filtered file should be non-empty + self.assertGreater(filtered_files[0].stat().st_size, 0, "Filtered file is unexpectedly empty.") + + +class TestTokenizeFilterDetokenizeRoundTrip(unittest.TestCase): + """End-to-end round trip using inlined tokenizer logic and monkeypatched filtering.""" + + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp_dir) + self.raw_dir = Path(self.tmp_dir) / "raw" + self.tokenized_dir = Path(self.tmp_dir) / "tokenized" + self.output_dir = Path(self.tmp_dir) / "outputs" + self.scores_dir = Path(self.tmp_dir) / "scores" + for p in (self.raw_dir, self.tokenized_dir, self.output_dir, self.scores_dir): + p.mkdir(parents=True, exist_ok=True) + + # Samples and scores definitions + self.samples = [ + {"text": "hello world"}, # fail A + {"text": "bonjour le monde"}, # pass + {"text": "hola mundo"}, # pass + {"text": "ciao mondo"}, # fail B + ] + self.raw_jsonl = self.raw_dir / "sample.jsonl" + _write_jsonl(self.samples, self.raw_jsonl) + scores_lines = [ + '{"document_id": "sample0", "score_A": 1.0, "score_B": 6.0}', + '{"document_id": "sample1", "score_A": 2.0, "score_B": 5.0}', + '{"document_id": "sample2", "score_A": 3.0, "score_B": 10.0}', + '{"document_id": "sample3", "score_A": 5.0, "score_B": 2.0}', + ] + (self.scores_dir / "sample.jsonl").write_text("\n".join(scores_lines) + "\n", encoding="utf-8") + + # Tokenize + tokenized_file = self.tokenized_dir / "sample.pbin" + _tokenize_to_pbin(self.raw_jsonl, tokenized_file) + + # Real filter_dataset will be used; we'll parse output later to approximate pass count via index length. + self.expected_texts = [self.samples[1]["text"], self.samples[2]["text"]] + + def test_round_trip_filtering(self): + pipeline = build_pipeline( + score_path=self.scores_dir, + tokenized_data_path=self.tokenized_dir, + output_folder=self.output_dir, + thresholds={"score_A": 2.0, "score_B": 5.0}, + base_file_prefix=Path(""), + tokenized_data_extension=".pbin", + ) + executor = LocalPipelineExecutor(pipeline=pipeline) + executor.run() + filtered_files = list(self.output_dir.rglob("sample.filtered.pbin")) + self.assertEqual(len(filtered_files), 1) + self.assertGreater(filtered_files[0].stat().st_size, 0) + + # Detokenize filtered file and verify the surviving texts correspond exactly to expected_texts + detok_texts = _detokenize_packed(filtered_files[0]) + # Because the filtering operates on original sample indices, surviving samples should map directly + # Assert order and content match expected + self.assertEqual(len(detok_texts), len(self.expected_texts)) + # Simple containment & order check (exact match) + self.assertEqual(detok_texts, self.expected_texts, "Detokenized filtered texts do not match expected output") + + +if __name__ == "__main__": # Allow running this file directly. + unittest.main() From 936462a7c8ac3ba09c5cccb897730ba8a3ed2ff6 Mon Sep 17 00:00:00 2001 From: alex-jude Date: Wed, 29 Oct 2025 21:45:34 +0100 Subject: [PATCH 07/10] chore: remove hardcoded YAML file path from main execution block --- .../data_processing/score_based_filtering/filter_pipeline.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py index 4b98fafa..2bc02076 100644 --- a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py +++ b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py @@ -1,6 +1,5 @@ from __future__ import annotations -import csv import os import sys from pathlib import Path @@ -206,7 +205,6 @@ def build_pipeline( return pipeline if __name__ == "__main__": - os.environ["FILTER_PIPELINE_YAML_FILE"] = str("/raid/s3/opengptx/jude/repos/ml_filter/ml_filter/configs/data_processing/example_filter_pipeline_config.yaml") if len(sys.argv) > 1 or not (yaml_file := os.getenv("FILTER_PIPELINE_YAML_FILE")) or not os.path.isfile(yaml_file): print( "This script is intended to be used with a YAML configuration " From 6bb08f7cdc2f4fa9401b7da4c410f4aaae982ca7 Mon Sep 17 00:00:00 2001 From: ajude2s Date: Thu, 30 Oct 2025 15:48:54 +0100 Subject: [PATCH 08/10] feat: add Slurm configuration files for filtering pipeline and update execution settings --- ...> lorem_ipsum_filter_pipeline_config.yaml} | 0 ...em_ipsum_filter_pipeline_config_slurm.yaml | 34 ++++++++++++++++++ .../score_based_filtering/filter_pipeline.py | 36 ++++++++++++++----- .../step_score_parsing.py | 3 -- 4 files changed, 61 insertions(+), 12 deletions(-) rename configs/data_processing/{example_filter_pipeline_config.yaml => lorem_ipsum_filter_pipeline_config.yaml} (100%) create mode 100644 configs/data_processing/lorem_ipsum_filter_pipeline_config_slurm.yaml diff --git a/configs/data_processing/example_filter_pipeline_config.yaml b/configs/data_processing/lorem_ipsum_filter_pipeline_config.yaml similarity index 100% rename from configs/data_processing/example_filter_pipeline_config.yaml rename to configs/data_processing/lorem_ipsum_filter_pipeline_config.yaml diff --git a/configs/data_processing/lorem_ipsum_filter_pipeline_config_slurm.yaml b/configs/data_processing/lorem_ipsum_filter_pipeline_config_slurm.yaml new file mode 100644 index 00000000..d50f17bb --- /dev/null +++ b/configs/data_processing/lorem_ipsum_filter_pipeline_config_slurm.yaml @@ -0,0 +1,34 @@ +params: + score_path: /leonardo_work/EUHPC_D21_101/alexj/repos/data/annotations + tokenized_data_path: /leonardo_work/EUHPC_D21_101/alexj/repos/data/tokenized + output_folder: /leonardo_work/EUHPC_D21_101/alexj/repos/data/outputs + + thresholds: + score_Gemma_Snowflake: 3.0 + score_Llama_Snowflake: 2.0 + + base_file_prefix: /leonardo_work/EUHPC_D21_101/alexj/repos/data/annotations + tokenized_data_extension: .pbin + +running_on_slurm: true + +local_settings: null + +slurm_settings: + sbatch_args: + account: "EUHPC_E05_119" + nodes: 1 + ntasks: 1 + gres: gpu:1 + partition: "boost_usr_prod" + time: "00:30:00" + cpus_per_task: 32 + gpus_per_task: 1 + mem_per_gpu: "8G" + job_name: "lorem_ipsum_filtering" + output: /data/cat/ws/alju972f-annotation_at_scale/.vscode/data/embedding_output_dir/scripts/slurm_output/%j.out + error: /data/cat/ws/alju972f-annotation_at_scale/.vscode/data/embedding_output_dir/scripts/slurm_output/%j.err + qos: "boost_qos_dbg" #"normal" + venv_path: /leonardo_work/EUHPC_D21_101/alexj/repos/scripts/env/venv_annotation_pipeline/bin/activate + tasks: 1 + workers: 1 \ No newline at end of file diff --git a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py index 2bc02076..5982e49d 100644 --- a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py +++ b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py @@ -121,24 +121,23 @@ class LocalExecutionSettings(BaseModel): class SlurmExecutionSettings(BaseModel): """Settings for running the pipeline on a Slurm cluster.""" - tasks: int = 1 - time: str = "00:15:00" + time: str = "00:30:00" partition: str = "default" - account: str | None = None # FIXME is this supported? - cpus_per_task: int = 1 - mem_per_cpu_gb: int = 2 + cpus_per_task: int = 4 + mem_per_cpu_gb: int = 8 workers: int = -1 - job_name: str = "data_processing" + job_name: str = "filtering_pipeline" qos: str = "normal" env_command: str | None = None condaenv: str | None = None venv_path: str | None = None - sbatch_args: dict[str, str] | None = None + # Allow users to supply any sbatch arg (e.g. nodes, ntasks, gres, account, output, error, gpus-per-task, etc.) + # using either snake_case or dash-case. Primitive values get coerced to strings. + sbatch_args: dict[str, str | int | float | bool] | None = None max_array_size: int = 1001 depends_job_id: str | None = None job_id_position: int = -1 - # job_id_retriever: Callable | None = None logging_dir: str | None = None skip_completed: bool = True slurm_logs_folder: str | None = None @@ -150,9 +149,28 @@ class SlurmExecutionSettings(BaseModel): mail_type: str = "ALL" mail_user: str | None = None requeue: bool = True - srun_args: dict[str, str] | None = None + srun_args: dict[str, str | int | float | bool] | None = None tasks_per_job: int = 1 + @model_validator(mode="before") + def _normalize_sbatch(cls, values): # type: ignore[override] + """Normalize sbatch_args only. + + - Accept numeric/bool types and coerce to string + - Fold common top-level keys (output, error, gpus_per_task) into sbatch_args + - Convert snake_case keys to dash-case + """ + from omegaconf import DictConfig as _DictConfig # local import + + sbatch_args = values.get("sbatch_args") or {} + if isinstance(sbatch_args, _DictConfig): + sbatch_args = OmegaConf.to_container(sbatch_args, resolve=True) # type: ignore[arg-type] + if not isinstance(sbatch_args, dict): + raise TypeError(f"sbatch_args must be a mapping if provided (got type {type(sbatch_args)})") + + values["sbatch_args"] = sbatch_args + return values + def run_pipeline(args: FilterPipelineBuilder) -> None: """Runs a datatrove pipeline to filter datasets based on scores. diff --git a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py index 5ddfeec1..2cb5e04c 100644 --- a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py +++ b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py @@ -106,9 +106,6 @@ def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, f dup_count += 1 duplicate_counts[document_id] = dup_count new_id = f"{document_id}_{dup_count}" - print( - f"Duplicate document_id '{document_id}' encountered at line {line_number} in {filepath}. Renamed to '{new_id}'." - ) document_id = new_id scores_for_document_idx[document_id] = {k: float(file_data[k]) for k in self._score_keys} From 3a5c21e7cbdff6336e5f8fb5b72af240de3c8d0d Mon Sep 17 00:00:00 2001 From: alex-jude Date: Tue, 4 Nov 2025 12:56:50 +0100 Subject: [PATCH 09/10] refactor: clean up imports and remove unused code in test_filter_pipeline.py --- .../test_filter_pipeline.py | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/tests/score_based_filtering/test_filter_pipeline.py b/tests/score_based_filtering/test_filter_pipeline.py index 1a8fac9d..b0cda540 100644 --- a/tests/score_based_filtering/test_filter_pipeline.py +++ b/tests/score_based_filtering/test_filter_pipeline.py @@ -11,23 +11,16 @@ # Standard library imports import json -import os -import sys import shutil import tempfile import unittest from pathlib import Path -from typing import List +from typing import Any # Third-party imports import numpy as np from datatrove.executor import LocalPipelineExecutor -# Local package imports: ensure src path is available. -_SRC_PATH = Path(__file__).resolve().parents[3] / "ml_filter" / "src" -if _SRC_PATH.exists() and str(_SRC_PATH) not in sys.path: - sys.path.insert(0, str(_SRC_PATH)) - from ml_filter.data_processing.score_based_filtering.step_data_filtering import make_filter_func from ml_filter.data_processing.score_based_filtering.step_score_parsing import ScoresParser from ml_filter.data_processing.score_based_filtering.filter_pipeline import build_pipeline @@ -35,6 +28,7 @@ # --------------------------------------------------------------------------- # Helper constants & functions # --------------------------------------------------------------------------- +_TOKENIZER_CACHE: dict[str, Any] = {} HEADER_SIZE = 64 # Mimics EmbeddedStreamData.HEADER_SIZE_IN_BYTES (simplified for tests) DATA_SECTION_LEN_BYTES = 8 @@ -48,10 +42,6 @@ def _write_jsonl(records: list[dict], path: Path) -> None: f.write(json.dumps(r, ensure_ascii=False) + "\n") -from typing import Any -_TOKENIZER_CACHE: dict[str, Any] = {} - - def _get_tokenizer(name: str): from transformers import AutoTokenizer if name not in _TOKENIZER_CACHE: @@ -147,9 +137,6 @@ def _detokenize_packed(packed_file: Path, tokenizer_name: str = "bert-base-multi # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- - -# We'll monkeypatch filter_dataset used inside DataFiltering - class TestMakeFilterFunc(unittest.TestCase): def test_filter_func_threshold_logic(self): scores = [ @@ -205,7 +192,6 @@ def test_parsing_handles_duplicate_ids(self): self.assertEqual(score_entries, expected_scores) - class TestFilteringOrderPreservation(unittest.TestCase): def setUp(self): self.tmp_dir = tempfile.mkdtemp() @@ -309,12 +295,12 @@ def test_round_trip_filtering(self): # Detokenize filtered file and verify the surviving texts correspond exactly to expected_texts detok_texts = _detokenize_packed(filtered_files[0]) - # Because the filtering operates on original sample indices, surviving samples should map directly + # Assert order and content match expected self.assertEqual(len(detok_texts), len(self.expected_texts)) # Simple containment & order check (exact match) self.assertEqual(detok_texts, self.expected_texts, "Detokenized filtered texts do not match expected output") -if __name__ == "__main__": # Allow running this file directly. +if __name__ == "__main__": unittest.main() From a0698c221e583046394cf97b152789887d3b02bf Mon Sep 17 00:00:00 2001 From: alex-jude Date: Tue, 4 Nov 2025 19:11:47 +0100 Subject: [PATCH 10/10] fix: enhance ScoresParser to preserve original document order and handle duplicates in score parsing --- .../step_score_parsing.py | 31 ++++++++++------ .../test_filter_pipeline.py | 35 +++++++++++++++++++ 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py index 2cb5e04c..327bfa06 100644 --- a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py +++ b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py @@ -85,10 +85,18 @@ def read_file(self, filepath: str) -> DocumentsPipeline: document = self.get_document_from_dict(doc_content, filepath, 0) return [document] - def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, float]]]: - scores_for_document_idx: dict[str, dict[str, float]] = {} - processed_count = 0 + def _parse_scores_jsonl_file( + self, filepath: str + ) -> tuple[str, list[dict[str, float]]]: + """ + Parse a JSONL file containing scores for documents. + + Preserves the original file order, even when duplicate document_ids exist. + Duplicates are disambiguated with numeric suffixes, but order is preserved. + """ + scores_for_document_idx: list[tuple[str, dict[str, float]]] = [] duplicate_counts: dict[str, int] = {} # track counts per original document_id + processed_count = 0 with self.data_folder.open(filepath, "r", compression=self._compression) as f: for line_number, line in enumerate(f, start=1): @@ -96,23 +104,24 @@ def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, f file_data = json.loads(line) document_id = file_data.get("document_id") - if document_id in scores_for_document_idx: - # Generate a new unique ID with a numeric suffix to disambiguate duplicates. + if any(doc_id == document_id or doc_id.startswith(f"{document_id}_") for doc_id, _ in scores_for_document_idx): + # Generate a new unique ID for duplicates dup_count = duplicate_counts.get(document_id, 0) + 1 duplicate_counts[document_id] = dup_count - # Use underscore + count; ensure no collision with an existing (unlikely but safe guard if previous had suffix already) new_id = f"{document_id}_{dup_count}" - while new_id in scores_for_document_idx: + while any(doc_id == new_id for doc_id, _ in scores_for_document_idx): dup_count += 1 duplicate_counts[document_id] = dup_count new_id = f"{document_id}_{dup_count}" document_id = new_id - scores_for_document_idx[document_id] = {k: float(file_data[k]) for k in self._score_keys} + # Append to list to preserve original order + scores_for_document_idx.append( + (document_id, {k: float(file_data[k]) for k in self._score_keys}) + ) + self._verify_unique_ids(filepath, dict(scores_for_document_idx), processed_count) + return filepath, [score_dict for _, score_dict in scores_for_document_idx] - self._verify_unique_ids(filepath, scores_for_document_idx, processed_count) - scores_as_list = [scores for _, scores in sorted(scores_for_document_idx.items(), key=lambda x: x[0])] - return f.name, scores_as_list def _verify_unique_ids(self, filepath: str, scores_for_document_idx: dict[str, dict], processed_count: int): """Verify that the number of unique document IDs matches the number of processed (valid) lines. diff --git a/tests/score_based_filtering/test_filter_pipeline.py b/tests/score_based_filtering/test_filter_pipeline.py index b0cda540..543bb1ba 100644 --- a/tests/score_based_filtering/test_filter_pipeline.py +++ b/tests/score_based_filtering/test_filter_pipeline.py @@ -302,5 +302,40 @@ def test_round_trip_filtering(self): self.assertEqual(detok_texts, self.expected_texts, "Detokenized filtered texts do not match expected output") +class TestScoresParserOrdering(unittest.TestCase): + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp_dir) + self.scores_dir = Path(self.tmp_dir) / "scores" + self.tokenized_dir = Path(self.tmp_dir) / "tokenized" + self.scores_dir.mkdir(parents=True, exist_ok=True) + self.tokenized_dir.mkdir(parents=True, exist_ok=True) + (self.tokenized_dir / "samples.pbin").write_bytes(b"dummy") + + scores_lines = [ + '{"document_id": "sample1", "score_A": 1.0}', + '{"document_id": "sample2", "score_A": 2.0}', + '{"document_id": "sample10", "score_A": 10.0}', + ] + (self.scores_dir / "samples.jsonl").write_text("\n".join(scores_lines) + "\n", encoding="utf-8") + + self.parser = ScoresParser( + data_folder=str(self.scores_dir), + score_keys=["score_A"], + tokenized_data_path=self.tokenized_dir, + base_file_prefix=Path(""), + ) + + def test_preserves_original_document_order(self): + docs_pipeline = self.parser.read_file("samples.jsonl") + self.assertEqual(len(docs_pipeline), 1) + score_entries = docs_pipeline[0].metadata[ScoresParser.SCORE_ENTRIES_KEY] + expected_scores = [ + {"score_A": 1.0}, + {"score_A": 2.0}, + {"score_A": 10.0}, + ] + self.assertEqual(score_entries, expected_scores) + if __name__ == "__main__": unittest.main()