# Chapter 7: Distributed Processing with Ray

**Data-Juicer User Guide**

- Git Commit: `v1.4.5`
- Commit Date: 2026-01-16
- Repository: https://github.com/datajuicer/data-juicer

# Table of Contents

1. [Setup](#setup)
2. [Explore Demo Configurations](#explore-demo-configurations)
3. [Run Distributed Processing](#run-distributed-processing)
   - [Programmatic Execution with Ray](#programmatic-execution-with-ray)
4. [Monitor Resources](#monitor-resources)
5. [Ray Dashboard](#ray-dashboard)
6. [Multi-Node Cluster Setup](#multi-node-cluster-setup)
7. [Try Deduplication Demo](#try-deduplication-demo)
8. [Performance Tips](#performance-tips)
9.  [Cleanup](#cleanup)
10. [Further Reading](#further-reading)

## Setup 

### Clone Data-Juicer Repository

First, let's clone the Data-Juicer repository to access the demo configurations and data:

In [None]:
!git clone --depth 1 https://github.com/datajuicer/data-juicer.git

In [None]:
# Install Data-Juicer with Ray support
!uv pip install py-data-juicer[distributed]

In [None]:
%cd data-juicer

### Setup Ray Cluster

In [None]:
# To start a local Ray cluster, run this command in your terminal:
# !ray start --head

In [None]:
# Check Ray cluster status
!ray status

## Explore Demo Configurations

Data-Juicer provides ready-to-use demo configurations in `demos/process_on_ray/`:

In [None]:
# List available demo configs
!ls -lh demos/process_on_ray/configs/

In [None]:
# View the demo configuration
!cat demos/process_on_ray/configs/demo.yaml

## Run Distributed Processing

Now let's run the distributed processing using the demo configuration:

In [None]:
# Process with Ray using demo config
!dj-process --config demos/process_on_ray/configs/demo.yaml

In [None]:
# View sample processed data
import os
import json

output_dir = 'outputs/demo/demo-processed'
try:
    sample_files = os.listdir(output_dir)
    print(f"Sample files count: {len(sample_files)}")
    for sample_file in sample_files:
        with open(os.path.join(output_dir, sample_file), 'r') as f:
            print(f"Sample file: {sample_file}")
            print(json.dumps(json.load(f), indent=4))
except FileNotFoundError:
    print("Output directory not found")
    

### Programmatic Execution with Ray

Alternatively, you can run the Ray pipeline programmatically in Python. This approach loads the YAML config as a Python dict and uses Data-Juicer's low-level APIs for maximum flexibility:

In [None]:
import yaml
import ray
from data_juicer.ops import load_ops
from data_juicer.core.data.dataset_builder import DatasetBuilder
from data_juicer.core.ray_exporter import RayExporter
from jsonargparse import Namespace

# Step 1: Load YAML config as Python dict
with open('demos/process_on_ray/configs/demo.yaml', 'r') as f:
    config_dict = yaml.safe_load(f)

print("Loaded config:")
print(f"  Project: {config_dict.get('project_name')}")
print(f"  Dataset path: {config_dict.get('dataset_path')}")
print(f"  Export path: {config_dict.get('export_path')}")
print(f"  Executor type: {config_dict.get('executor_type')}")
print(f"  Process operators: {len(config_dict.get('process', []))}")

In [None]:
# Step 2: Initialize Ray cluster
ray.init(ignore_reinit_error=True)
print(f"Ray initialized: {ray.is_initialized()}")

# Step 3: Load dataset as Ray Dataset
# Extract dataset_path from config dict
ds_cfg = Namespace({"dataset_path": config_dict["dataset_path"]})
dataset_builder = DatasetBuilder(ds_cfg, executor_type=config_dict.get("executor_type"))

ds = dataset_builder.load_dataset()
print(f"Loaded dataset with {ds.data.count()} samples")

In [None]:
# Step 4: Extract process list from config dict and load operators
process_list = config_dict["process"]
print(f"Process list: {process_list}")

ops = load_ops(process_list)
print(f"Loaded {len(ops)} operators: {[op._name for op in ops]}")

In [None]:
# Step 5: Process dataset through operators using RayDataset.process()
ds.process(ops)
print(f"Processing complete. Remaining samples: {ds.data.count()}")

In [None]:
# Display results
print("Processed data:")
for i, sample in enumerate(ds.data.take(5), 1):
    print(f"{i}. {sample}")

In [None]:
# Step 6: Export results using RayExporter
# Extract export settings from config dict
export_path = os.path.abspath('./outputs/ray_programmatic/processed')
os.makedirs(export_path, exist_ok=True)

exporter = RayExporter(
    export_path=export_path,
    export_type="jsonl"
)
exporter.export(ds.data, columns=ds.data.columns())
print(f"Export complete to: {export_path}")

In [None]:
try:
    sample_files = os.listdir(export_path)
    print(f"Sample files count: {len(sample_files)}")
    for sample_file in sample_files:
        with open(os.path.join(export_path, sample_file), 'r') as f:
            print(f"Sample file: {sample_file}")
            print(json.dumps(json.load(f), indent=4))
except FileNotFoundError:
    print("Output directory not found")

Both execution methods produce the same filtered dataset:
- **Command-line with YAML**: Simple and quick for one-off processing with config files
- **Programmatic with Python**: Load YAML as dict and use Python API - ideal for:
  - Integration into larger Python workflows
  - Fine-grained control over each processing step
  - Dynamic operator configuration at runtime
  - Debugging and step-by-step inspection

## Monitor Resources

In [None]:
# Check resource usage
import ray
from data_juicer.utils.ray_utils import ray_cpu_count, ray_gpu_count

ray.init(ignore_reinit_error=True)

print(f"Total CPUs: {ray_cpu_count()}")
print(f"Total GPUs: {ray_gpu_count()}")

## Ray Dashboard

Access Ray Dashboard at: `http://localhost:8265`

The dashboard provides:
- Real-time resource utilization
- Task execution timeline
- Memory usage statistics
- Error logs and debugging info

## Multi-Node Cluster Setup

In [None]:
print("Multi-node Ray cluster setup:")
print("""
# On head node:
ray start --head --port=6379 --num-cpus=8

# On worker nodes:
ray start --address='<head-node-ip>:6379' --num-cpus=8

# In Data-Juicer config:
executor_type: 'ray'
ray_address: '<head-node-ip>:6379'
""")

## Try Deduplication Demo

Data-Juicer also provides a deduplication demo using Ray:

In [None]:
# View deduplication config
!cat demos/process_on_ray/configs/dedup.yaml

In [None]:
# check input directory
!ls -lh demos/process_on_ray/data

In [None]:
# Run deduplication
!dj-process --config demos/process_on_ray/configs/dedup.yaml

In [None]:
# Check output directory
!ls -lh outputs/demo-dedup/demo-ray-bts-dedup-processed

In [None]:
# View sample processed data
import os
import json
output_dir = 'outputs/demo-dedup/demo-ray-bts-dedup-processed'
try:
    sample_files = os.listdir(output_dir)
    print(f"Sample files count: {len(sample_files)}")
    for sample_file in sample_files:
        with open(os.path.join(output_dir, sample_file), 'r') as f:
            for i, line in enumerate(f):
                if i < 3:
                    print(json.dumps(json.loads(line), ensure_ascii=False))
except FileNotFoundError:
    print("Output directory not found")

## Performance Tips

Performance optimization tips for Ray processing:

1. **Shard Size**: Adjust export_shard_size based on dataset size
   - Smaller shards (100-1000): Better for fault tolerance
   - Larger shards (5000-10000): Better for throughput

2. **Caching**: Enable caching for repeated operations
   use_cache: true
   cache_compress: 'gzip'

3. **Operator Fusion**: Combine compatible operators
   op_fusion: true

4. **Resource Allocation**: Match workers to available resources
   - CPU-bound ops: More workers
   - GPU-bound ops: Fewer workers with GPU allocation

5. **Monitoring**: Use Ray Dashboard at http://localhost:8265

## Cleanup

In [None]:
# Stop Ray cluster
# !ray stop

In [None]:
# Remove cloned Data-Juicer repository
!rm -rf data-juicer

## Further Reading

- [Distributed Processing Documentation](https://datajuicer.github.io/data-juicer/en/main/docs/Distributed.html)
- [Ray Documentation](https://docs.ray.io/)