From ffae303d3e0a08e0b01a23ff77944ff06aa491d3 Mon Sep 17 00:00:00 2001 From: boomb0om Date: Mon, 6 May 2024 00:02:30 +0300 Subject: [PATCH 1/2] feat: add video fps tranform --- DPF/transforms/__init__.py | 1 + DPF/transforms/video_fps_transforms.py | 52 +++++++++++++++++++++++ DPF/transforms/video_resize_transforms.py | 6 +-- 3 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 DPF/transforms/video_fps_transforms.py diff --git a/DPF/transforms/__init__.py b/DPF/transforms/__init__.py index 88be354..78b6a0b 100644 --- a/DPF/transforms/__init__.py +++ b/DPF/transforms/__init__.py @@ -1,4 +1,5 @@ from .base_file_transforms import BaseFilesTransforms, TransformsFileData from .image_resize_transforms import ImageResizeTransforms from .resizer import Resizer, ResizerModes +from .video_fps_transforms import VideoFPSTransforms from .video_resize_transforms import VideoResizeTransforms diff --git a/DPF/transforms/video_fps_transforms.py b/DPF/transforms/video_fps_transforms.py new file mode 100644 index 0000000..410f26d --- /dev/null +++ b/DPF/transforms/video_fps_transforms.py @@ -0,0 +1,52 @@ +import shutil +import subprocess +import uuid + +from DPF.transforms.base_file_transforms import ( + BaseFilesTransforms, + PoolOptions, + TransformsFileData, +) +from DPF.transforms.video_resize_transforms import is_ffmpeg_installed + + +class VideoFPSTransforms(BaseFilesTransforms): + + def __init__( + self, + fps: int, + eps: float = 0.1, + pool_type: PoolOptions = 'threads', + workers: int = 16, + pbar: bool = True + ): + super().__init__(pool_type, workers, pbar) + self.fps = fps + self.eps = eps + + assert is_ffmpeg_installed(), "Install ffmpeg first" + + @property + def required_metadata(self) -> list[str]: + return ['fps'] + + @property + def metadata_to_change(self) -> list[str]: + return ['fps'] + + @property + def modality(self) -> str: + return 'video' + + def _process_filepath(self, data: TransformsFileData) -> TransformsFileData: + filepath = data.filepath + ext = filepath.split('.')[-1] + video_fps = data.metadata['fps'] + + if (video_fps < (self.fps - self.eps)) or (video_fps > (self.fps + self.eps)): + temp_filename = str(uuid.uuid4()) + '.' + ext + ffmpeg_command = f'ffmpeg -hide_banner -i {filepath} -vf fps={self.fps} {temp_filename} -y' + subprocess.run(ffmpeg_command, shell=True, capture_output=True, check=True) + shutil.move(temp_filename, filepath) + + return TransformsFileData(filepath, {'fps': self.fps}) diff --git a/DPF/transforms/video_resize_transforms.py b/DPF/transforms/video_resize_transforms.py index 5ee4b6b..460a149 100644 --- a/DPF/transforms/video_resize_transforms.py +++ b/DPF/transforms/video_resize_transforms.py @@ -24,7 +24,7 @@ def __init__( self, resizer: Resizer, ffmpeg_preset: str = 'fast', - pool_type: PoolOptions = 'processes', + pool_type: PoolOptions = 'threads', workers: int = 16, pbar: bool = True ): @@ -32,7 +32,7 @@ def __init__( self.resizer = resizer self.ffmpeg_preset = ffmpeg_preset - assert is_ffmpeg_installed(), "Please install ffmpeg" + assert is_ffmpeg_installed(), "Install ffmpeg first" @property def required_metadata(self) -> list[str]: @@ -57,7 +57,7 @@ def _process_filepath(self, data: TransformsFileData) -> TransformsFileData: new_width += new_width % 2 new_height += new_height % 2 temp_filename = str(uuid.uuid4())+'.'+ext - ffmpeg_command = f'ffmpeg -i {filepath} -preset {self.ffmpeg_preset} -vf "scale={new_width}:{new_height}" {temp_filename} -y' + ffmpeg_command = f'ffmpeg -hide_banner -i {filepath} -preset {self.ffmpeg_preset} -vf "scale={new_width}:{new_height}" {temp_filename} -y' subprocess.run(ffmpeg_command, shell=True, capture_output=True, check=True) shutil.move(temp_filename, filepath) From 23de547b2aefa7f48d4bd1ebe35e730e9ab5e97e Mon Sep 17 00:00:00 2001 From: boomb0om Date: Mon, 6 May 2024 10:44:22 +0300 Subject: [PATCH 2/2] feat: add transforms in pipelines --- DPF/pipelines/filter_pipeline.py | 37 ++++++++++++++++++++-------- DPF/pipelines/pipeline_stages.py | 42 +++++++++++++++++++++++++++----- 2 files changed, 63 insertions(+), 16 deletions(-) diff --git a/DPF/pipelines/filter_pipeline.py b/DPF/pipelines/filter_pipeline.py index 55aa8f0..91d2825 100644 --- a/DPF/pipelines/filter_pipeline.py +++ b/DPF/pipelines/filter_pipeline.py @@ -6,6 +6,7 @@ from DPF.filters import ColumnFilter, DataFilter from DPF.filters.multigpu_filter import MultiGPUDataFilter from DPF.processors import DatasetProcessor +from DPF.transforms import BaseFilesTransforms from DPF.utils.logger import init_logger, init_stdout_logger from .pipeline_stages import ( @@ -14,6 +15,7 @@ FilterPipelineStage, PipelineStage, ShufflePipelineStage, + TransformPipelineStage, ) from .types import OnErrorOptions @@ -39,17 +41,17 @@ def add_datafilter( datafilter: type[DataFilter], datafilter_kwargs: dict[str, Any], devices: Optional[list[str]] = None, - processor_run_kwargs: Optional[dict[str, Any]] = None, + processor_apply_kwargs: Optional[dict[str, Any]] = None, on_error: OnErrorOptions = "stop", skip_if_columns_exist: bool = True ) -> None: - if processor_run_kwargs is None: - processor_run_kwargs = {} + if processor_apply_kwargs is None: + processor_apply_kwargs = {} if devices is None: stage = FilterPipelineStage( 'datafilter', filter_class=datafilter, - filter_kwargs=datafilter_kwargs, processor_run_kwargs=processor_run_kwargs, + filter_kwargs=datafilter_kwargs, processor_apply_kwargs=processor_apply_kwargs, skip_if_columns_exist=skip_if_columns_exist ) elif len(devices) == 0: @@ -57,7 +59,7 @@ def add_datafilter( new_kwargs['device'] = devices[0] stage = FilterPipelineStage( 'datafilter', filter_class=datafilter, - filter_kwargs=new_kwargs, processor_run_kwargs=processor_run_kwargs, + filter_kwargs=new_kwargs, processor_apply_kwargs=processor_apply_kwargs, skip_if_columns_exist=skip_if_columns_exist ) else: @@ -68,7 +70,7 @@ def add_datafilter( "datafilter_class": datafilter, "datafilter_params": datafilter_kwargs }, - processor_run_kwargs=processor_run_kwargs, + processor_apply_kwargs=processor_apply_kwargs, skip_if_columns_exist=skip_if_columns_exist ) @@ -80,16 +82,16 @@ def add_columnfilter( self, columnfilter: type[ColumnFilter], columnfilter_kwargs: dict[str, Any], - processor_run_kwargs: Optional[dict[str, Any]] = None, + processor_apply_kwargs: Optional[dict[str, Any]] = None, on_error: OnErrorOptions = "stop", skip_if_columns_exist: bool = True ) -> None: - if processor_run_kwargs is None: - processor_run_kwargs = {} + if processor_apply_kwargs is None: + processor_apply_kwargs = {} stage = FilterPipelineStage( 'columnfilter', filter_class=columnfilter, - filter_kwargs=columnfilter_kwargs, processor_run_kwargs=processor_run_kwargs, + filter_kwargs=columnfilter_kwargs, processor_apply_kwargs=processor_apply_kwargs, skip_if_columns_exist=skip_if_columns_exist ) @@ -123,6 +125,21 @@ def add_dataframe_filter( PipelineStageRunner(stage, on_error=on_error) ) + def add_transforms( + self, + transforms_class: type[BaseFilesTransforms], + transforms_kwargs: dict[str, Any], + processor_apply_kwargs: Optional[dict[str, Any]] = None, + on_error: OnErrorOptions = "stop" + ) -> None: + stage = TransformPipelineStage( + transforms_class, transforms_kwargs, + processor_apply_kwargs=processor_apply_kwargs + ) + self.stages.append( + PipelineStageRunner(stage, on_error=on_error) + ) + def _log_dataset_info(self, processor: DatasetProcessor) -> None: self.logger.info(f'Dataset path: {processor.config.path}') self.logger.info(f'Dataset modalities: {processor.modalities}') diff --git a/DPF/pipelines/pipeline_stages.py b/DPF/pipelines/pipeline_stages.py index 7f489a2..dc1f01b 100644 --- a/DPF/pipelines/pipeline_stages.py +++ b/DPF/pipelines/pipeline_stages.py @@ -1,12 +1,13 @@ import logging from abc import ABC, abstractmethod -from typing import Any, Callable, Union +from typing import Any, Callable, Optional, Union import pandas as pd from DPF.filters import ColumnFilter, DataFilter from DPF.filters.multigpu_filter import MultiGPUDataFilter from DPF.processors import DatasetProcessor +from DPF.transforms import BaseFilesTransforms from .types import FilterTypes @@ -66,13 +67,17 @@ def __init__( filter_type: FilterTypes, filter_class: Union[type[DataFilter], type[ColumnFilter], type[MultiGPUDataFilter]], filter_kwargs: dict[str, Any], - processor_run_kwargs: dict[str, Any], + processor_apply_kwargs: Optional[dict[str, Any]] = None, skip_if_columns_exist: bool = True ): self.filter_type = filter_type self.filter_class = filter_class self.filter_kwargs = filter_kwargs - self.processor_run_kwargs = processor_run_kwargs + + self.processor_apply_kwargs = processor_apply_kwargs + if self.processor_apply_kwargs is None: + self.processor_apply_kwargs = {} + self.skip_if_columns_exist = skip_if_columns_exist @property @@ -96,10 +101,35 @@ def run(self, processor: DatasetProcessor, logger: logging.Logger) -> None: processor.df.drop(columns=columns_to_be_added, inplace=True, errors='ignore') if self.filter_type == 'datafilter': - processor.apply_data_filter(filter_obj, **self.processor_run_kwargs) # type: ignore + processor.apply_data_filter(filter_obj, **self.processor_apply_kwargs) # type: ignore elif self.filter_type == 'columnfilter': - processor.apply_column_filter(filter_obj, **self.processor_run_kwargs) # type: ignore + processor.apply_column_filter(filter_obj, **self.processor_apply_kwargs) # type: ignore elif self.filter_type == 'multigpufilter': - processor.apply_multi_gpu_data_filter(filter_obj, **self.processor_run_kwargs) + processor.apply_multi_gpu_data_filter(filter_obj, **self.processor_apply_kwargs) # type: ignore else: raise ValueError(f"Unknown filter type: {self.filter_type}") + + +class TransformPipelineStage(PipelineStage): + + def __init__( + self, + transforms_class: type[BaseFilesTransforms], + transforms_kwargs: dict[str, Any], + processor_apply_kwargs: Optional[dict[str, Any]] = None, + ): + self.transforms_class = transforms_class + self.transforms_kwargs = transforms_kwargs + + self.processor_apply_kwargs = processor_apply_kwargs + if self.processor_apply_kwargs is None: + self.processor_apply_kwargs = {} + + @property + def stage_name(self) -> str: + return f"TransformPipelineStage(transforms_class={self.transforms_class}, transforms_kwargs={self.transforms_kwargs})" + + def run(self, processor: DatasetProcessor, logger: logging.Logger) -> None: + transforms = self.transforms_class(**self.transforms_kwargs) + + processor.apply_transform(transforms, **self.processor_apply_kwargs) # type: ignore