Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#
/weights/
data/
pipelines/
cache_dir/

# pycharm
Expand Down
13 changes: 12 additions & 1 deletion DPF/filters/data_filter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import multiprocessing
import sys
from abc import ABC, abstractmethod
from typing import Any

Expand All @@ -19,6 +21,7 @@ def __init__(self, pbar: bool, _pbar_position: int = 0):
super().__init__()
self.pbar = pbar
self.pbar_position = _pbar_position
self._created_by_multigpu_data_filter = False

@property
def schema(self) -> list[str]:
Expand Down Expand Up @@ -120,7 +123,15 @@ def run(self, dataset: Dataset[tuple[bool, Any]]) -> pd.DataFrame:
pd.DataFrame
Dataframe with columns from schema property
"""
dataloader = DataLoader(dataset, collate_fn=identical_collate_fn, **self.dataloader_kwargs)
multiprocessing_context = None
if self._created_by_multigpu_data_filter and sys.platform not in {'win32', 'darwin'}:
multiprocessing_context = multiprocessing.get_context('fork')

dataloader = DataLoader(
dataset, collate_fn=identical_collate_fn,
multiprocessing_context=multiprocessing_context,
**self.dataloader_kwargs
)
filter_results = self._get_dict_from_schema()

for batch in tqdm(dataloader, disable=not self.pbar, position=self.pbar_position):
Expand Down
53 changes: 53 additions & 0 deletions DPF/filters/images/dummy_gpu_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Any, Union

import torch

from DPF.types import ModalityToDataMapping

from .img_filter import ImageFilter


class DummyGPUFilter(ImageFilter):
"""
DummyGPUFilter class for testing purposes
"""

def __init__(
self,
workers: int = 16,
device: Union[str, torch.device] = "cuda",
pbar: bool = True,
_pbar_position: int = 0
):
super().__init__(pbar, _pbar_position)
self.num_workers = workers
self.device = device

@property
def result_columns(self) -> list[str]:
return ["dummy_label",]

@property
def dataloader_kwargs(self) -> dict[str, Any]:
return {
"num_workers": self.num_workers,
"batch_size": 1,
"drop_last": False,
}

def preprocess_data(
self,
modality2data: ModalityToDataMapping,
metadata: dict[str, Any]
) -> Any:
key = metadata[self.key_column]
return key, 1

def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]:
df_batch_labels = self._get_dict_from_schema()

keys, dummy_labels = list(zip(*batch))
df_batch_labels[self.key_column].extend(keys)
df_batch_labels[self.result_columns[0]].extend(dummy_labels)

return df_batch_labels
13 changes: 12 additions & 1 deletion DPF/filters/multigpu_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def run_one_process(
reader = DatasetReader(connector=connector)
processor = reader.from_df(config, df)
datafilter = filter_class(**filter_kwargs, _pbar_position=i, device=device) # type: ignore
datafilter._created_by_multigpu_data_filter = True
processor.apply_data_filter(datafilter, **filter_run_kwargs)
res = processor.df
res.set_index(index, inplace=True)
Expand Down Expand Up @@ -61,6 +62,16 @@ def __init__(
self.devices = devices
self.num_parts = len(devices)

# getting result columns names
datafilter = self.filter_class(**self.filter_params, device=devices[0]) # type: ignore
self._result_columns = datafilter.result_columns
del datafilter
torch.cuda.empty_cache()

@property
def result_columns(self) -> list[str]:
return self._result_columns

def run(
self,
df: pd.DataFrame,
Expand Down Expand Up @@ -108,8 +119,8 @@ def run(
)

processes = []
context = multiprocessing.get_context('spawn')
for param in params:
context = multiprocessing.get_context('spawn')
p = context.Process(target=run_one_process, args=param)
p.start()
processes.append(p)
Expand Down
4 changes: 3 additions & 1 deletion DPF/filters/videos/lita_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def __init__(
pretrainers = load_pretrained_model(weights_path, model_base, self.model_name, load_8bit, load_4bit)
self.tokenizer, self.model, self.processor, self.context_len = pretrainers

self.model_num_frames = self.model.config.num_frames

self.conv_mode = "llava_v1"
self.conv = conv_templates[self.conv_mode].copy()

Expand Down Expand Up @@ -118,7 +120,7 @@ def preprocess_data(
) -> Any:
key = metadata[self.key_column]
video_file = BytesIO(modality2data['video'])
video_file = load_video(video_file, self.processor, self.model.config.num_frames).unsqueeze(0).half()
video_file = load_video(video_file, self.processor, self.model_num_frames).unsqueeze(0).half()
return key, video_file

def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]:
Expand Down
1 change: 1 addition & 0 deletions DPF/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .filter_pipeline import FilterPipeline
148 changes: 148 additions & 0 deletions DPF/pipelines/filter_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from dataclasses import dataclass
from typing import Any, Callable, Optional

import pandas as pd

from DPF.filters import ColumnFilter, DataFilter
from DPF.filters.multigpu_filter import MultiGPUDataFilter
from DPF.processors import DatasetProcessor
from DPF.utils.logger import init_logger, init_stdout_logger

from .pipeline_stages import (
DataFramePipelineStage,
DeduplicationPipelineStage,
FilterPipelineStage,
PipelineStage,
ShufflePipelineStage,
)
from .types import OnErrorOptions


@dataclass
class PipelineStageRunner:
stage: PipelineStage
on_error: OnErrorOptions


class FilterPipeline:

def __init__(self, pipeline_name: str, logs_dir: Optional[str] = None):
self.pipeline_name = pipeline_name
self.stages: list[PipelineStageRunner] = []
if logs_dir:
self.logger = init_logger(pipeline_name, logging_dir=logs_dir)
else:
self.logger = init_stdout_logger()

def add_datafilter(
self,
datafilter: type[DataFilter],
datafilter_kwargs: dict[str, Any],
devices: Optional[list[str]] = None,
processor_run_kwargs: Optional[dict[str, Any]] = None,
on_error: OnErrorOptions = "stop",
skip_if_columns_exist: bool = True
) -> None:
if processor_run_kwargs is None:
processor_run_kwargs = {}

if devices is None:
stage = FilterPipelineStage(
'datafilter', filter_class=datafilter,
filter_kwargs=datafilter_kwargs, processor_run_kwargs=processor_run_kwargs,
skip_if_columns_exist=skip_if_columns_exist
)
elif len(devices) == 0:
new_kwargs = datafilter_kwargs.copy()
new_kwargs['device'] = devices[0]
stage = FilterPipelineStage(
'datafilter', filter_class=datafilter,
filter_kwargs=new_kwargs, processor_run_kwargs=processor_run_kwargs,
skip_if_columns_exist=skip_if_columns_exist
)
else:
stage = FilterPipelineStage(
'multigpufilter', filter_class=MultiGPUDataFilter,
filter_kwargs={
"devices": devices,
"datafilter_class": datafilter,
"datafilter_params": datafilter_kwargs
},
processor_run_kwargs=processor_run_kwargs,
skip_if_columns_exist=skip_if_columns_exist
)

self.stages.append(
PipelineStageRunner(stage, on_error=on_error)
)

def add_columnfilter(
self,
columnfilter: type[ColumnFilter],
columnfilter_kwargs: dict[str, Any],
processor_run_kwargs: Optional[dict[str, Any]] = None,
on_error: OnErrorOptions = "stop",
skip_if_columns_exist: bool = True
) -> None:
if processor_run_kwargs is None:
processor_run_kwargs = {}

stage = FilterPipelineStage(
'columnfilter', filter_class=columnfilter,
filter_kwargs=columnfilter_kwargs, processor_run_kwargs=processor_run_kwargs,
skip_if_columns_exist=skip_if_columns_exist
)

self.stages.append(
PipelineStageRunner(stage, on_error=on_error)
)

def add_shuffle(self) -> None:
stage = ShufflePipelineStage()
self.stages.append(
PipelineStageRunner(stage, on_error="stop")
)

def add_deduplication(
self,
columns: list[str],
on_error: OnErrorOptions = "stop"
) -> None:
stage = DeduplicationPipelineStage(columns)
self.stages.append(
PipelineStageRunner(stage, on_error=on_error)
)

def add_dataframe_filter(
self,
filter_func: Callable[[pd.DataFrame], pd.DataFrame],
on_error: OnErrorOptions = "stop"
) -> None:
stage = DataFramePipelineStage(filter_func)
self.stages.append(
PipelineStageRunner(stage, on_error=on_error)
)

def _log_dataset_info(self, processor: DatasetProcessor) -> None:
self.logger.info(f'Dataset path: {processor.config.path}')
self.logger.info(f'Dataset modalities: {processor.modalities}')
self.logger.info(f'Dataset size: {processor.df.shape}')
self.logger.info(f'Dataset columns: {processor.df.columns}')

def run(self, processor: DatasetProcessor) -> None:
self.logger.info(f'Starting filtering dataset {processor.config.path} with {self.pipeline_name} pipeline')
self._log_dataset_info(processor)
for i, stage_runner in enumerate(self.stages):
self.logger.info("-"*16)
self.logger.info(f"Starting stage {i}: {stage_runner.stage.stage_name}")
try:
stage_runner.stage.run(processor, self.logger)
except Exception as err:
self.logger.exception(f"Error occured during filtering: {err}")
if stage_runner.on_error == "stop":
self.logger.warning('Stopping pipeline')
raise err
else:
self.logger.warning('Continue')
else:
self.logger.info(f"Pipeline stage finished. New dataframe shape: {processor.df.shape}")
Loading