# Running and Optimizing Data Processing Pipelines

## What are Data Processing Pipelines?

A data processing pipeline in Data-Juicer is a workflow that executes a series of data processing operations. These pipelines are defined via configuration files (recipes) that specify the operators to run and their execution order. The pipeline processes datasets sequentially, resulting in a cleaned and filtered high-quality dataset.

## In This Notebook

1. Different ways to run data processing pipelines
2. Understanding the executor process
3. Performance optimization techniques with practical examples

## Methods to Run Data Processing Pipelines

Data-Juicer provides multiple ways to run data processing pipelines, including command-line tools and programming interfaces.

First of all, copy the demo configuration file to the local configs directory.

In [None]:
# copy the demo configuration file to local configs directory
%mkdir -p configs/demo
%mkdir -p demos/data
%cp ../configs/demo/process.yaml configs/demo/
%cp ../demos/data/demo-dataset.jsonl demos/data/

### 1. Running via Command Line

This is the most common approach. You can directly execute the configured processing pipeline using the command-line tool.

The [configs/demo/process.yaml](https://github.com/modelscope/data-juicer/blob/main/configs/demo/process.yaml) here is the given data_recipes.

In [None]:
# Using the recommended dj-process command line tool
!dj-process --config configs/demo/process.yaml

# Or, if installed from source, using Python script
# !python tools/process_data.py --config configs/demo/process.yaml

The [process_data.py](https://github.com/modelscope/data-juicer/blob/main/tools/process_data.py) will call the executor.run() method to process the data, Support RAY mode.

### 2. Running Programmatically

Besides command-line tools, you can also run data processing pipeline directly in Python code.

Let's see how to run a pipeline programmatically:

In [None]:
# we init the corresponding config
from data_juicer.config import init_configs
cfg = init_configs(['--config', 'configs/demo/process.yaml'])

In [None]:
from data_juicer.core import DefaultExecutor

# Create and run executor
executor = DefaultExecutor(cfg)
dataset = executor.run()

print("\nProcessed samples:")
for i, sample in enumerate(dataset):
    print(f"{i+1}. {sample['text']}")

## Understanding the Executor Process

Let's break down what happens when we run the executor. The DefaultExecutor.run() method in [default_executor.py](https://github.com/modelscope/data-juicer/blob/main/data_juicer/core/executor/default_executor.py) performs several key steps:

### 1. Loading and Formatting Data

First, the method loads and formats the data. It can load dataset from a checkpoint in previous run, or load dataset from the data formatter.

``` python 
    def run(
        self,
        dataset: Union[Dataset, NestedDataset] = None,
        load_data_np: Optional[PositiveInt] = None,
        skip_export: bool = False,
        skip_return: bool = False,
    ):
        ...
        # 1. format data
        if dataset is not None:
            logger.info(f"Using existing dataset {dataset}")
        elif self.cfg.use_checkpoint and self.ckpt_manager.ckpt_available:
            logger.info("Loading dataset from checkpoint...")
            dataset = self.ckpt_manager.load_ckpt()
        else:
            logger.info("Loading dataset from dataset builder...")
            if load_data_np is None:
                load_data_np = self.np
            dataset = self.dataset_builder.load_dataset(num_proc=load_data_np)
        ...
```

You can run the code below to see the dataset here interactively.

In [None]:
loaded_dataset = executor.dataset_builder.load_dataset(num_proc=cfg.np)
for sample in loaded_dataset:
    print(sample)

### 2. Loading Operators

Next, the method loads the operators from the given configuration file.

``` python

    def run(self, load_data_np = None)
        ...
        # 2. extract processes
        logger.info("Preparing process operators...")
        ops = load_ops(self.cfg.process)
        ...

```

You can run the code below to see the ops.

In [None]:
from data_juicer.ops import load_ops
ops = load_ops(cfg.process)
for op in ops:
    print(op.__class__.__name__)

### 3. Processing Data

Then, the method applies the loaded operators to the data. All data samples are processed through the list of operators.

``` python
    def run():
        ...
        # 3. data process
        # - If tracer is open, trace each op after it's processed
        # - If checkpoint is open, clean the cache files after each process
        logger.info("Processing data...")
        tstart = time()
        dataset = dataset.process(
            ops,
            work_dir=self.work_dir,
            exporter=self.exporter,
            checkpointer=self.ckpt_manager,
            tracer=self.tracer,
            adapter=self.adapter,
            open_monitor=self.cfg.open_monitor,
        )
        tend = time()
        logger.info(f"All OPs are done in {tend - tstart:.3f}s.")
        ...
```

Here, all data samples are processed through the list of ops.

In [None]:
dataset = loaded_dataset.process(
    ops,
    work_dir=cfg.work_dir,
    exporter=executor.exporter,
    checkpointer=executor.ckpt_manager,
    tracer=executor.tracer,
    adapter=executor.adapter,
    open_monitor=cfg.open_monitor
)

for sample in dataset:
    print(sample)

### 4. Exporting Data

Finally, the method exports the processed dataset to the specified path.

``` python
    def run():
        ...
        # 4. data export
        if not skip_export:
            logger.info('Exporting dataset to disk...')
            self.exporter.export(dataset)
        ...
```

You can check the processed dataset and statistics in the export path specified in [configs/demo/process.yaml](https://github.com/modelscope/data-juicer/blob/main/configs/demo/process.yaml)

``` yaml
export_path: './outputs/demo-process/demo-processed.jsonl'
```

After processing, Data-Juicer generates two main output files:
- **demo-processed.jsonl**: The processed dataset
- **demo-processed_stats.jsonl**: Processing statistics and metadata

In [None]:
cfg.export_path

## Performance Optimization Tips

In [None]:
import tempfile

# Create sample data with repetitive patterns
sample_data = []
for i in range(500):
    # Some samples with repeated words to test word repetition filter
    if i % 3 == 0:
        sample_data.append({"text": "word word word word word word word word word word " * 5})
    elif i % 3 == 1:
        sample_data.append({"text": f"This is a high quality English text with appropriate length and good content for sample {i}."})
    else:
        sample_data.append({"text": f"short text but pneumonoultramicroscopicsilicovolcanoconiosis"})

# Write to temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.jsonl') as f:
    import json
    for item in sample_data:
        f.write(json.dumps(item) + '\n')
    temp_data_path = f.name

In [None]:
import time
from data_juicer.config import init_configs
from data_juicer.core import DefaultExecutor

# Recipe with parallel processing
parallel_recipe = f"""
project_name: 'np_example'
dataset_path: '{temp_data_path}'
export_path: './outputs/np_dataset.jsonl'
np: 4

process:
  - language_id_score_filter:
      lang: 'en'
      min_score: 0.8
  - words_num_filter:
      lang: 'en'
      min_num: 5
      max_num: 1000
  - word_repetition_filter:
      lang: 'en'
      max_ratio: 0.5
      rep_len: 5
  - stopwords_filter:
      lang: 'en'
      min_ratio: 0.1
"""

# Write recipe to file
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
    f.write(parallel_recipe)
    f.flush()
    parallel_config_path = f.name

print("Running with 4 parallel processes...")
start_time = time.time()
cfg = init_configs(['--config', parallel_config_path], load_configs_only=True)
executor = DefaultExecutor(cfg)
parallel_dataset = executor.run()
parallel_time = time.time() - start_time
print(f"Parallel processing completed in {parallel_time:.2f} seconds")
print(f"Processed {len(dataset)} samples")

### 2. Operator Fusion for Reduced Memory Usage

Operator fusion is a powerful optimization technique that combines multiple operators into a single processing step, reducing intermediate data storage and memory usage. This is especially beneficial when operators share common computations. Let's compare processing with and without operator fusion:

In [None]:
# Recipe with operator fusion
fusion_recipe = f"""
project_name: 'fusion_example'
dataset_path: '{temp_data_path}'
export_path: './outputs/fusion_dataset.jsonl'
np: 4

process:
  - general_fused_op:
      fused_op_list:
        - language_id_score_filter:
            lang: 'en'
            min_score: 0.8
        - words_num_filter:
            lang: 'en'
            min_num: 5
            max_num: 1000
        - word_repetition_filter:
            lang: 'en'
            max_ratio: 0.5
            rep_len: 5
        - stopwords_filter:
            lang: 'en'
            min_ratio: 0.1
"""


# Run with fusion
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
    f.write(fusion_recipe)
    f.flush()
    fusion_config_path = f.name

print("Running with operator fusion...")
start_time = time.time()
cfg = init_configs(['--config', fusion_config_path], load_configs_only=True)
executor = DefaultExecutor(cfg)
dataset_fusion = executor.run()
fusion_time = time.time() - start_time

In [None]:
print(f"Performance Comparison:")
print(f"Individual operators: {parallel_time:.2f} seconds ({len(parallel_dataset)} samples)")
print(f"Fused operator:       {fusion_time:.2f} seconds ({len(dataset_fusion)} samples)")
print(f"Performance improvement: {((parallel_time - fusion_time) / parallel_time * 100):.1f}%")

assert len(dataset_fusion) == len(parallel_dataset)

### 3. Additional Optimization Techniques

Data-Juicer offers several other optimization techniques that can be combined for maximum performance:

#### Caching and Checkpointing for Fault Tolerance

For long-running pipelines, enabling caching and checkpointing can save significant time if the process is interrupted:

```yaml
project_name: 'robust_processing_example'
dataset_path: './data/large_dataset.jsonl'
export_path: './outputs/processed_dataset.jsonl'

# Option 1: Use checkpointing (cache will be disabled automatically)
use_checkpoint: true
temp_dir: './temp_checkpoints'

# Option 2: Use caching (uncomment below and set use_checkpoint to false)
# use_cache: true
# ds_cache_dir: './cache'

process:
  - document_deduplicator:
      lowercase: true
  - text_length_filter:
      min_len: 10
      max_len: 10000
```

#### Memory-Efficient Processing with Compressed Caching

When working with limited disk space, you can enable compressed caching to reduce storage footprint:

```yaml
project_name: 'memory_efficient_example'
dataset_path: './data/large_dataset.jsonl'
export_path: './outputs/processed_dataset.jsonl'

# Use compressed caching to reduce memory usage
cache_compress: 'gzip'

process:
  - clean_html_mapper: {}
  - language_id_score_filter:
      lang: 'en'
      min_score: 0.8
```

#### Large-Scale Data Sharding

For extremely large datasets, you can shard the export to manage file sizes and facilitate parallel processing workflows:

```yaml
project_name: 'sharded_export_example'
dataset_path: './data/very_large_dataset.jsonl'
export_path: './outputs/sharded_dataset.jsonl'

# Shard the output into files with maximum 10000 samples each
# Parallel export across shards happens automatically
export_shard_size: 10000

process:
  - text_length_filter:
      min_len: 10
      max_len: 10000
```

These optimization techniques can significantly improve the performance of your Data-Juicer pipelines. For detailed configuration options and advanced settings, please refer to [config_all.yaml](https://github.com/modelscope/data-juicer/blob/main/configs/config_all.yaml).

## Next Steps

Continue with the next notebook in the series to learn about data quality analysis and visualization with Data-Juicer's Analyzer module, which helps you understand dataset characteristics and make informed processing decisions.