Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions DPF/pipelines/filter_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from DPF.filters import ColumnFilter, DataFilter
from DPF.filters.multigpu_filter import MultiGPUDataFilter
from DPF.processors import DatasetProcessor
from DPF.transforms import BaseFilesTransforms
from DPF.utils.logger import init_logger, init_stdout_logger

from .pipeline_stages import (
Expand All @@ -14,6 +15,7 @@
FilterPipelineStage,
PipelineStage,
ShufflePipelineStage,
TransformPipelineStage,
)
from .types import OnErrorOptions

Expand All @@ -39,25 +41,25 @@ def add_datafilter(
datafilter: type[DataFilter],
datafilter_kwargs: dict[str, Any],
devices: Optional[list[str]] = None,
processor_run_kwargs: Optional[dict[str, Any]] = None,
processor_apply_kwargs: Optional[dict[str, Any]] = None,
on_error: OnErrorOptions = "stop",
skip_if_columns_exist: bool = True
) -> None:
if processor_run_kwargs is None:
processor_run_kwargs = {}
if processor_apply_kwargs is None:
processor_apply_kwargs = {}

if devices is None:
stage = FilterPipelineStage(
'datafilter', filter_class=datafilter,
filter_kwargs=datafilter_kwargs, processor_run_kwargs=processor_run_kwargs,
filter_kwargs=datafilter_kwargs, processor_apply_kwargs=processor_apply_kwargs,
skip_if_columns_exist=skip_if_columns_exist
)
elif len(devices) == 0:
new_kwargs = datafilter_kwargs.copy()
new_kwargs['device'] = devices[0]
stage = FilterPipelineStage(
'datafilter', filter_class=datafilter,
filter_kwargs=new_kwargs, processor_run_kwargs=processor_run_kwargs,
filter_kwargs=new_kwargs, processor_apply_kwargs=processor_apply_kwargs,
skip_if_columns_exist=skip_if_columns_exist
)
else:
Expand All @@ -68,7 +70,7 @@ def add_datafilter(
"datafilter_class": datafilter,
"datafilter_params": datafilter_kwargs
},
processor_run_kwargs=processor_run_kwargs,
processor_apply_kwargs=processor_apply_kwargs,
skip_if_columns_exist=skip_if_columns_exist
)

Expand All @@ -80,16 +82,16 @@ def add_columnfilter(
self,
columnfilter: type[ColumnFilter],
columnfilter_kwargs: dict[str, Any],
processor_run_kwargs: Optional[dict[str, Any]] = None,
processor_apply_kwargs: Optional[dict[str, Any]] = None,
on_error: OnErrorOptions = "stop",
skip_if_columns_exist: bool = True
) -> None:
if processor_run_kwargs is None:
processor_run_kwargs = {}
if processor_apply_kwargs is None:
processor_apply_kwargs = {}

stage = FilterPipelineStage(
'columnfilter', filter_class=columnfilter,
filter_kwargs=columnfilter_kwargs, processor_run_kwargs=processor_run_kwargs,
filter_kwargs=columnfilter_kwargs, processor_apply_kwargs=processor_apply_kwargs,
skip_if_columns_exist=skip_if_columns_exist
)

Expand Down Expand Up @@ -123,6 +125,21 @@ def add_dataframe_filter(
PipelineStageRunner(stage, on_error=on_error)
)

def add_transforms(
self,
transforms_class: type[BaseFilesTransforms],
transforms_kwargs: dict[str, Any],
processor_apply_kwargs: Optional[dict[str, Any]] = None,
on_error: OnErrorOptions = "stop"
) -> None:
stage = TransformPipelineStage(
transforms_class, transforms_kwargs,
processor_apply_kwargs=processor_apply_kwargs
)
self.stages.append(
PipelineStageRunner(stage, on_error=on_error)
)

def _log_dataset_info(self, processor: DatasetProcessor) -> None:
self.logger.info(f'Dataset path: {processor.config.path}')
self.logger.info(f'Dataset modalities: {processor.modalities}')
Expand Down
42 changes: 36 additions & 6 deletions DPF/pipelines/pipeline_stages.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable, Union
from typing import Any, Callable, Optional, Union

import pandas as pd

from DPF.filters import ColumnFilter, DataFilter
from DPF.filters.multigpu_filter import MultiGPUDataFilter
from DPF.processors import DatasetProcessor
from DPF.transforms import BaseFilesTransforms

from .types import FilterTypes

Expand Down Expand Up @@ -66,13 +67,17 @@ def __init__(
filter_type: FilterTypes,
filter_class: Union[type[DataFilter], type[ColumnFilter], type[MultiGPUDataFilter]],
filter_kwargs: dict[str, Any],
processor_run_kwargs: dict[str, Any],
processor_apply_kwargs: Optional[dict[str, Any]] = None,
skip_if_columns_exist: bool = True
):
self.filter_type = filter_type
self.filter_class = filter_class
self.filter_kwargs = filter_kwargs
self.processor_run_kwargs = processor_run_kwargs

self.processor_apply_kwargs = processor_apply_kwargs
if self.processor_apply_kwargs is None:
self.processor_apply_kwargs = {}

self.skip_if_columns_exist = skip_if_columns_exist

@property
Expand All @@ -96,10 +101,35 @@ def run(self, processor: DatasetProcessor, logger: logging.Logger) -> None:
processor.df.drop(columns=columns_to_be_added, inplace=True, errors='ignore')

if self.filter_type == 'datafilter':
processor.apply_data_filter(filter_obj, **self.processor_run_kwargs) # type: ignore
processor.apply_data_filter(filter_obj, **self.processor_apply_kwargs) # type: ignore
elif self.filter_type == 'columnfilter':
processor.apply_column_filter(filter_obj, **self.processor_run_kwargs) # type: ignore
processor.apply_column_filter(filter_obj, **self.processor_apply_kwargs) # type: ignore
elif self.filter_type == 'multigpufilter':
processor.apply_multi_gpu_data_filter(filter_obj, **self.processor_run_kwargs)
processor.apply_multi_gpu_data_filter(filter_obj, **self.processor_apply_kwargs) # type: ignore
else:
raise ValueError(f"Unknown filter type: {self.filter_type}")


class TransformPipelineStage(PipelineStage):

def __init__(
self,
transforms_class: type[BaseFilesTransforms],
transforms_kwargs: dict[str, Any],
processor_apply_kwargs: Optional[dict[str, Any]] = None,
):
self.transforms_class = transforms_class
self.transforms_kwargs = transforms_kwargs

self.processor_apply_kwargs = processor_apply_kwargs
if self.processor_apply_kwargs is None:
self.processor_apply_kwargs = {}

@property
def stage_name(self) -> str:
return f"TransformPipelineStage(transforms_class={self.transforms_class}, transforms_kwargs={self.transforms_kwargs})"

def run(self, processor: DatasetProcessor, logger: logging.Logger) -> None:
transforms = self.transforms_class(**self.transforms_kwargs)

processor.apply_transform(transforms, **self.processor_apply_kwargs) # type: ignore
1 change: 1 addition & 0 deletions DPF/transforms/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base_file_transforms import BaseFilesTransforms, TransformsFileData
from .image_resize_transforms import ImageResizeTransforms
from .resizer import Resizer, ResizerModes
from .video_fps_transforms import VideoFPSTransforms
from .video_resize_transforms import VideoResizeTransforms
52 changes: 52 additions & 0 deletions DPF/transforms/video_fps_transforms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import shutil
import subprocess
import uuid

from DPF.transforms.base_file_transforms import (
BaseFilesTransforms,
PoolOptions,
TransformsFileData,
)
from DPF.transforms.video_resize_transforms import is_ffmpeg_installed


class VideoFPSTransforms(BaseFilesTransforms):

def __init__(
self,
fps: int,
eps: float = 0.1,
pool_type: PoolOptions = 'threads',
workers: int = 16,
pbar: bool = True
):
super().__init__(pool_type, workers, pbar)
self.fps = fps
self.eps = eps

assert is_ffmpeg_installed(), "Install ffmpeg first"

@property
def required_metadata(self) -> list[str]:
return ['fps']

@property
def metadata_to_change(self) -> list[str]:
return ['fps']

@property
def modality(self) -> str:
return 'video'

def _process_filepath(self, data: TransformsFileData) -> TransformsFileData:
filepath = data.filepath
ext = filepath.split('.')[-1]
video_fps = data.metadata['fps']

if (video_fps < (self.fps - self.eps)) or (video_fps > (self.fps + self.eps)):
temp_filename = str(uuid.uuid4()) + '.' + ext
ffmpeg_command = f'ffmpeg -hide_banner -i {filepath} -vf fps={self.fps} {temp_filename} -y'
subprocess.run(ffmpeg_command, shell=True, capture_output=True, check=True)
shutil.move(temp_filename, filepath)

return TransformsFileData(filepath, {'fps': self.fps})
6 changes: 3 additions & 3 deletions DPF/transforms/video_resize_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ def __init__(
self,
resizer: Resizer,
ffmpeg_preset: str = 'fast',
pool_type: PoolOptions = 'processes',
pool_type: PoolOptions = 'threads',
workers: int = 16,
pbar: bool = True
):
super().__init__(pool_type, workers, pbar)
self.resizer = resizer
self.ffmpeg_preset = ffmpeg_preset

assert is_ffmpeg_installed(), "Please install ffmpeg"
assert is_ffmpeg_installed(), "Install ffmpeg first"

@property
def required_metadata(self) -> list[str]:
Expand All @@ -57,7 +57,7 @@ def _process_filepath(self, data: TransformsFileData) -> TransformsFileData:
new_width += new_width % 2
new_height += new_height % 2
temp_filename = str(uuid.uuid4())+'.'+ext
ffmpeg_command = f'ffmpeg -i {filepath} -preset {self.ffmpeg_preset} -vf "scale={new_width}:{new_height}" {temp_filename} -y'
ffmpeg_command = f'ffmpeg -hide_banner -i {filepath} -preset {self.ffmpeg_preset} -vf "scale={new_width}:{new_height}" {temp_filename} -y'
subprocess.run(ffmpeg_command, shell=True, capture_output=True, check=True)
shutil.move(temp_filename, filepath)

Expand Down