# Chapter 7: Distributed Processing with Ray

**Data-Juicer User Guide**

- Git Commit: `v1.4.6`
- Commit Date: 2026-02-02
- Repository: https://github.com/datajuicer/data-juicer

# Table of Contents

1. [Setup](#setup)
2. [Explore Demo Configurations](#explore-demo-configurations)
3. [Run Distributed Processing](#run-distributed-processing)
   - [Programmatic Execution with Ray](#programmatic-execution-with-ray)
4. [Monitor Resources](#monitor-resources)
5. [Ray Dashboard](#ray-dashboard)
6. [Multi-Node Cluster Setup](#multi-node-cluster-setup)
7. [Try Deduplication Demo](#try-deduplication-demo)
8. [Performance Tips](#performance-tips)
9.  [Cleanup](#cleanup)
10. [Further Reading](#further-reading)

## Setup 

### Clone Data-Juicer Repository

First, let's clone the Data-Juicer repository to access the demo configurations and data:

In [1]:
!git clone --depth 1 https://github.com/datajuicer/data-juicer.git

Cloning into 'data-juicer'...
remote: Enumerating objects: 1246, done.[K
remote: Counting objects: 100% (1246/1246), done.[K
remote: Compressing objects: 100% (932/932), done.[K
remote: Total 1246 (delta 361), reused 818 (delta 285), pack-reused 0 (from 0)[K
Receiving objects: 100% (1246/1246), 34.37 MiB | 39.36 MiB/s, done.
Resolving deltas: 100% (361/361), done.


In [2]:
# Install Data-Juicer with Ray support
# If running in Google Colab, use 'pip install' instead of 'uv pip install'
!uv pip install py-data-juicer[distributed]

[2K[2mResolved [1m192 packages[0m [2min 6.11s[0m[0m                                       [0m
[2K   [36m[1mBuilding[0m[39m pyspark[2m==3.5.5[0m                                             
[2K[1A   [36m[1mBuilding[0m[39m pyspark[2m==3.5.5[0m                                     [1A
[37m⠙[0m [2mPreparing packages...[0m (0/35)
[2K[3A   [36m[1mBuilding[0m[39m pyspark[2m==3.5.5[0m---------------------------[0m[0m     0 B/214.83 KiB                                                                            [3A
[37m⠙[0m [2mPreparing packages...[0m (0/35)
[2K[3A   [36m[1mBuilding[0m[39m pyspark[2m==3.5.5[0m---------------------------[0m[0m     0 B/214.83 KiB                                                                            [3A
[37m⠙[0m [2mPreparing packages...[0m (0/35)
[2mwrapt                             [0m [32m[30m[2m------------------------------[0m[0m     0 B/80.45 KiB
[2K[5A   [36m[1mBuilding[0m[39m pyspark

In [3]:
%cd data-juicer

/workspaces/data-juicer-hub/data-juicer


### Setup Ray Cluster

In [4]:
# To start a local Ray cluster, run this command in your terminal:
# !ray start --head

In [5]:
# Check Ray cluster status
!ray status

Node status
---------------------------------------------------------------
Active:
 1 node_7ea4651dbeafcb2dc1d2dc1e8c8d3820670fb5f5c2547cf000bfb8a8
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Total Usage:
 0.0/4.0 CPU
 0B/8.06GiB memory
 0B/3.46GiB object_store_memory

From request_resources:
 (none)
Pending Demands:
 (no resource demands)
[0m

## Explore Demo Configurations

Data-Juicer provides ready-to-use demo configurations in `demos/process_on_ray/`:

In [6]:
# List available demo configs
!ls -lh demos/process_on_ray/configs/

total 20K
-rw-rw-rw- 1 vscode vscode  390 Feb 12 09:30 dedup.yaml
-rw-rw-rw- 1 vscode vscode 6.1K Feb 12 09:30 demo-new-config.yaml
-rw-rw-rw- 1 vscode vscode 6.0K Feb 12 09:30 demo.yaml


In [7]:
# View the demo configuration
!cat demos/process_on_ray/configs/demo.yaml

# Process config example for dataset

# global parameters
project_name: 'ray-demo'
dataset_path: './demos/process_on_ray/data/demo-dataset.jsonl'  # path to your dataset directory or file
export_path: './outputs/demo/demo-processed'

executor_type: 'ray'
ray_address: 'auto'                     # change to your ray cluster address, e.g., ray://<hostname>:<port>

# process schedule
# a list of several process operators with their arguments
process:
  # Filter ops
  - alphanumeric_filter:                                    # filter text with alphabet/numeric ratio out of specific range.
      tokenization: false                                     # Whether to count the ratio of alphanumeric to the total number of tokens.
      min_ratio: 0.0                                          # the min ratio of filter range
      max_ratio: 0.9                                          # the max ratio of filter range
  - average_line_length_filter:                             # filter text with the 

## Run Distributed Processing

Now let's run the distributed processing using the demo configuration:

In [8]:
# Process with Ray using demo config
!dj-process --config demos/process_on_ray/configs/demo.yaml

[32m2026-02-12 09:32:49.408[0m | [1mINFO    [0m | [36mdata_juicer.config.config[0m:[36m695[0m - [1mdataset_path config is set and a valid local path[0m
2026-02-12 09:32:49,418	INFO worker.py:1821 -- Connecting to existing Ray cluster at address: 10.0.0.151:6379...
2026-02-12 09:32:49,429	INFO worker.py:1998 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[32m2026-02-12 09:32:50.341[0m | [1mINFO    [0m | [36mdata_juicer.config.config[0m:[36m1012[0m - [1mBack up the input config file [/workspaces/data-juicer-hub/data-juicer/demos/process_on_ray/configs/demo.yaml] into the work_dir [/workspaces/data-juicer-hub/data-juicer/outputs/demo][0m
[32m2026-02-12 09:32:50.351[0m | [1mINFO    [0m | [36mdata_juicer.config.config[0m:[36m1033[0m - [1mConfiguration table: [0m
╒══════════════════════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════╕
│ key                

In [9]:
# View sample processed data
import os
import json

output_dir = 'outputs/demo/demo-processed'
try:
    sample_files = os.listdir(output_dir)
    print(f"Sample files count: {len(sample_files)}")
    for sample_file in sample_files:
        with open(os.path.join(output_dir, sample_file), 'r') as f:
            print(f"Sample file: {sample_file}")
            print(json.dumps(json.load(f), indent=4))
except FileNotFoundError:
    print("Output directory not found")
    

Sample files count: 1
Sample file: 26_34e199c47fd9417bb10fe2c4ddb75c32_000000_000000.json
{
    "text": "What\u2019s one thing you wish everyone knew about the brain?\nibble\nWhat\u2019s one thing you wish everyone knew about the brain?\nThe place to have real conversations and understand each other better. Join a community or build and grow your own with groups, threads, and conversations.\nSee this content immediately after install\nGet The App\n",
    "__dj__stats__": {
        "alnum_ratio": 0.8096676737,
        "avg_line_length": 55.1666666667,
        "char_rep_ratio": 0.099378882,
        "flagged_words_ratio": 0.0,
        "lang": "en",
        "lang_score": 0.9424384832,
        "max_line_length": 155,
        "num_words": 55,
        "perplexity": 1274.5,
        "special_char_ratio": 0.1903323263,
        "stopwords_ratio": 0.4909090909,
        "text_len": 331,
        "word_rep_ratio": 0.0434782609
    }
}


### Programmatic Execution with Ray

Alternatively, you can run the Ray pipeline programmatically in Python. This approach loads the YAML config as a Python dict and uses Data-Juicer's low-level APIs for maximum flexibility:

In [10]:
import yaml
import ray
from data_juicer.ops import load_ops
from data_juicer.core.data.dataset_builder import DatasetBuilder
from data_juicer.core.ray_exporter import RayExporter
from jsonargparse import Namespace

# Step 1: Load YAML config as Python dict
with open('demos/process_on_ray/configs/demo.yaml', 'r') as f:
    config_dict = yaml.safe_load(f)

print("Loaded config:")
print(f"  Project: {config_dict.get('project_name')}")
print(f"  Dataset path: {config_dict.get('dataset_path')}")
print(f"  Export path: {config_dict.get('export_path')}")
print(f"  Executor type: {config_dict.get('executor_type')}")
print(f"  Process operators: {len(config_dict.get('process', []))}")

  from .autonotebook import tqdm as notebook_tqdm
2026-02-12 09:34:21,617	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2026-02-12 09:34:29,311	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


Loaded config:
  Project: ray-demo
  Dataset path: ./demos/process_on_ray/data/demo-dataset.jsonl
  Export path: ./outputs/demo/demo-processed
  Executor type: ray
  Process operators: 12


In [11]:
# Step 2: Initialize Ray cluster
ray.init(ignore_reinit_error=True)
print(f"Ray initialized: {ray.is_initialized()}")

# Step 3: Load dataset as Ray Dataset
# Extract dataset_path from config dict
ds_cfg = Namespace({"dataset_path": config_dict["dataset_path"]})
dataset_builder = DatasetBuilder(ds_cfg, executor_type=config_dict.get("executor_type"))

ds = dataset_builder.load_dataset()
print(f"Loaded dataset with {ds.data.count()} samples")

2026-02-12 09:34:33,499	INFO worker.py:1821 -- Connecting to existing Ray cluster at address: 10.0.0.151:6379...
2026-02-12 09:34:33,536	INFO worker.py:1998 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[32m2026-02-12 09:34:33.562[0m | [1mINFO    [0m | [36mdata_juicer.core.data.dataset_builder[0m:[36m__init__[0m:[36m48[0m - [1mfound dataset_path setting: ./demos/process_on_ray/data/demo-dataset.jsonl[0m
[32m2026-02-12 09:34:33.563[0m | [1mINFO    [0m | [36mdata_juicer.core.data.load_strategy[0m:[36mget_strategy_class[0m:[36m84[0m - [1mGetting strategy class for exec: ray, data_type: local, data_source: None[0m
[32m2026-02-12 09:34:33.564[0m | [1mINFO    [0m | [36mdata_juicer.core.data.load_strategy[0m:[36mload_data[0m:[36m236[0m - [1mUsing resolved path for loading ray dataset: /workspaces/data-juicer-hub/data-juicer/demos/process_on_ray/data/demo-dataset.jsonl[0m
[32m2026-02-12 09:34:33.564[0m | [1mI

Ray initialized: True


2026-02-12 09:34:38,778	INFO progress_bar.py:215 -- ReadJSONStream->SplitBlocks(8): Tasks: 1; Actors: 0; Queued blocks: 0 (0.0B); Resources: 1.0 CPU, 384.0MiB object store: Progress Completed 0 / ?
2026-02-12 09:34:38,779	INFO progress_bar.py:215 -- limit=1: Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store: Progress Completed 0 / ?
2026-02-12 09:34:38,780	INFO progress_bar.py:215 -- Running Dataset: dataset_31_0. Active & requested resources: 1/4 CPU, 384.0MiB/1.7GiB object store: Progress Completed 0 / ?
2026-02-12 09:34:39,290	INFO streaming_executor.py:304 -- ✔️  Dataset dataset_31_0 execution finished in 5.61 seconds
2026-02-12 09:34:39,296	INFO logging.py:397 -- Registered dataset logger for dataset dataset_32_0
2026-02-12 09:34:39,301	INFO streaming_executor.py:178 -- Starting execution of Dataset dataset_32_0. Full logs are in /tmp/ray/session_2026-02-12_09-32-07_983799_13730/logs/ray-data
2026-02-12 09:34:39,302	INFO streaming_executor.py:179 

Loaded dataset with 11 samples


In [12]:
# Step 4: Extract process list from config dict and load operators
process_list = config_dict["process"]
print(f"Process list: {process_list}")

ops = load_ops(process_list)
print(f"Loaded {len(ops)} operators: {[op._name for op in ops]}")

[32m2026-02-12 09:34:47.238[0m | [1mINFO    [0m | [36mdata_juicer.utils.model_utils[0m:[36mprepare_fasttext_model[0m:[36m559[0m - [1mLoading fasttext language identification model...[0m


Process list: [{'alphanumeric_filter': {'tokenization': False, 'min_ratio': 0.0, 'max_ratio': 0.9}}, {'average_line_length_filter': {'min_len': 10, 'max_len': 10000}}, {'character_repetition_filter': {'rep_len': 10, 'min_ratio': 0.0, 'max_ratio': 0.5}}, {'flagged_words_filter': {'lang': 'en', 'tokenization': False, 'max_ratio': 0.0045, 'flagged_words_dir': './assets', 'use_words_aug': False, 'words_aug_group_sizes': [2], 'words_aug_join_char': ''}}, {'language_id_score_filter': {'lang': 'en', 'min_score': 0.8}}, {'maximum_line_length_filter': {'min_len': 10, 'max_len': 10000}}, {'perplexity_filter': {'lang': 'en', 'max_ppl': 1500}}, {'special_characters_filter': {'min_ratio': 0.0, 'max_ratio': 0.25}}, {'stopwords_filter': {'lang': 'en', 'tokenization': False, 'min_ratio': 0.3, 'stopwords_dir': './assets', 'use_words_aug': False, 'words_aug_group_sizes': [2], 'words_aug_join_char': ''}}, {'text_length_filter': {'min_len': 10, 'max_len': 10000}}, {'words_num_filter': {'lang': 'en', 'toke

[32m2026-02-12 09:34:47.630[0m | [1mINFO    [0m | [36mdata_juicer.utils.model_utils[0m:[36mprepare_sentencepiece_model[0m:[36m859[0m - [1mLoading sentencepiece model...[0m
[32m2026-02-12 09:34:47.680[0m | [1mINFO    [0m | [36mdata_juicer.utils.model_utils[0m:[36mprepare_kenlm_model[0m:[36m703[0m - [1mLoading kenlm language model...[0m


Loaded 12 operators: ['alphanumeric_filter', 'average_line_length_filter', 'character_repetition_filter', 'flagged_words_filter', 'language_id_score_filter', 'maximum_line_length_filter', 'perplexity_filter', 'special_characters_filter', 'stopwords_filter', 'text_length_filter', 'words_num_filter', 'word_repetition_filter']


In [13]:
# Step 5: Process dataset through operators using RayDataset.process()
ds.process(ops)
print(f"Processing complete. Remaining samples: {ds.data.count()}")

[32m2026-02-12 09:34:52.348[0m | [1mINFO    [0m | [36mdata_juicer.utils.ray_utils[0m:[36mget_ray_nodes_info[0m:[36m96[0m - [1mRay nodes:
[{'NodeID': '7ea4651dbeafcb2dc1d2dc1e8c8d3820670fb5f5c2547cf000bfb8a8', 'Alive': True, 'NodeManagerAddress': '10.0.0.151', 'NodeManagerHostname': 'codespaces-94212f', 'NodeManagerPort': 38155, 'ObjectManagerPort': 33057, 'ObjectStoreSocketName': '/tmp/ray/session_2026-02-12_09-32-07_983799_13730/sockets/plasma_store', 'RayletSocketName': '/tmp/ray/session_2026-02-12_09-32-07_983799_13730/sockets/raylet', 'MetricsExportPort': 60390, 'NodeName': '10.0.0.151', 'RuntimeEnvAgentPort': 47864, 'DeathReason': 0, 'DeathReasonMessage': '', 'alive': True, 'Resources': {'memory': 8658465178.0, 'object_store_memory': 3710770790.0, 'CPU': 4.0, 'node:10.0.0.151': 1.0, 'node:__internal_head__': 1.0}, 'Labels': {'ray.io/node-id': '7ea4651dbeafcb2dc1d2dc1e8c8d3820670fb5f5c2547cf000bfb8a8'}}][0m
[32m2026-02-12 09:34:52.363[0m | [1mINFO    [0m | [36mdata

Processing complete. Remaining samples: 1


In [14]:
# Display results
print("Processed data:")
for i, sample in enumerate(ds.data.take(5), 1):
    print(f"{i}. {sample}")

2026-02-12 09:35:10,820	INFO dataset.py:3641 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2026-02-12 09:35:10,824	INFO logging.py:397 -- Registered dataset logger for dataset dataset_60_0
2026-02-12 09:35:10,826	INFO limit_pushdown.py:140 -- Skipping push down of limit 5 through map MapBatches[MapBatches(filter_batch)] because it requires 1000 rows to produce stable outputs
2026-02-12 09:35:10,836	INFO streaming_executor.py:178 -- Starting execution of Dataset dataset_60_0. Full logs are in /tmp/ray/session_2026-02-12_09-32-07_983799_13730/logs/ray-data
2026-02-12 09:35:10,836	INFO streaming_executor.py:179 -- Execution plan of Dataset dataset_60_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSONStream] -> TaskPoolMapOperator[MapBatches(process_batch_arrow)] -> TaskPoolMapOperator[MapBatches(compute_stats_batched)] -> TaskPoolMapOperator[MapBatches(filter_batch)] -> TaskPoolMapOperator[MapBatches(compute_stats_batched

Processed data:


[36m(MapBatches(compute_stats_batched) pid=16070)[0m 2026-02-12 09:35:11.294 | INFO     | data_juicer.utils.model_utils:prepare_fasttext_model:559 - Loading fasttext language identification model...
[36m(MapBatches(compute_stats_batched) pid=16070)[0m 2026-02-12 09:35:11.585 | INFO     | data_juicer.utils.model_utils:prepare_sentencepiece_model:859 - Loading sentencepiece model...
[36m(MapBatches(compute_stats_batched) pid=16070)[0m 2026-02-12 09:35:11.649 | INFO     | data_juicer.utils.model_utils:prepare_kenlm_model:703 - Loading kenlm language model...
2026-02-12 09:35:12,365	INFO streaming_executor.py:304 -- ✔️  Dataset dataset_60_0 execution finished in 1.52 seconds


1. {'text': 'What’s one thing you wish everyone knew about the brain?\nibble\nWhat’s one thing you wish everyone knew about the brain?\nThe place to have real conversations and understand each other better. Join a community or build and grow your own with groups, threads, and conversations.\nSee this content immediately after install\nGet The App\n', '__dj__stats__': {'alnum_ratio': 0.8096676737160121, 'avg_line_length': 55.166666666666664, 'char_rep_ratio': 0.09937888198757763, 'flagged_words_ratio': 0.0, 'lang': 'en', 'lang_score': 0.9424384832382202, 'max_line_length': 155, 'num_words': 55, 'perplexity': 1274.5, 'special_char_ratio': 0.1903323262839879, 'stopwords_ratio': 0.4909090909090909, 'text_len': 331, 'word_rep_ratio': 0.043478260869565216}}


In [15]:
# Step 6: Export results using RayExporter
# Extract export settings from config dict
export_path = os.path.abspath('./outputs/ray_programmatic/processed')
os.makedirs(export_path, exist_ok=True)

exporter = RayExporter(
    export_path=export_path,
    export_type="jsonl"
)
exporter.export(ds.data, columns=ds.data.columns())
print(f"Export complete to: {export_path}")

2026-02-12 09:35:17,842	INFO logging.py:397 -- Registered dataset logger for dataset dataset_61_0
2026-02-12 09:35:17,844	INFO limit_pushdown.py:140 -- Skipping push down of limit 1 through map MapBatches[MapBatches(filter_batch)] because it requires 1000 rows to produce stable outputs
2026-02-12 09:35:17,851	INFO streaming_executor.py:178 -- Starting execution of Dataset dataset_61_0. Full logs are in /tmp/ray/session_2026-02-12_09-32-07_983799_13730/logs/ray-data
2026-02-12 09:35:17,852	INFO streaming_executor.py:179 -- Execution plan of Dataset dataset_61_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSONStream] -> TaskPoolMapOperator[MapBatches(process_batch_arrow)] -> TaskPoolMapOperator[MapBatches(compute_stats_batched)] -> TaskPoolMapOperator[MapBatches(filter_batch)] -> TaskPoolMapOperator[MapBatches(compute_stats_batched)] -> TaskPoolMapOperator[MapBatches(filter_batch)] -> TaskPoolMapOperator[MapBatches(compute_stats_batched)] -> TaskPoolMapOperator[MapBatches(filter_b

Export complete to: /workspaces/data-juicer-hub/data-juicer/outputs/ray_programmatic/processed


In [16]:
try:
    sample_files = os.listdir(export_path)
    print(f"Sample files count: {len(sample_files)}")
    for sample_file in sample_files:
        with open(os.path.join(export_path, sample_file), 'r') as f:
            print(f"Sample file: {sample_file}")
            print(json.dumps(json.load(f), indent=4))
except FileNotFoundError:
    print("Output directory not found")

Sample files count: 1
Sample file: 58_15d3b6a570824cdfb2b014c015f12705_000000_000000.json
{
    "text": "What\u2019s one thing you wish everyone knew about the brain?\nibble\nWhat\u2019s one thing you wish everyone knew about the brain?\nThe place to have real conversations and understand each other better. Join a community or build and grow your own with groups, threads, and conversations.\nSee this content immediately after install\nGet The App\n",
    "__dj__stats__": {
        "alnum_ratio": 0.8096676737,
        "avg_line_length": 55.1666666667,
        "char_rep_ratio": 0.099378882,
        "flagged_words_ratio": 0.0,
        "lang": "en",
        "lang_score": 0.9424384832,
        "max_line_length": 155,
        "num_words": 55,
        "perplexity": 1274.5,
        "special_char_ratio": 0.1903323263,
        "stopwords_ratio": 0.4909090909,
        "text_len": 331,
        "word_rep_ratio": 0.0434782609
    }
}


Both execution methods produce the same filtered dataset:
- **Command-line with YAML**: Simple and quick for one-off processing with config files
- **Programmatic with Python**: Load YAML as dict and use Python API - ideal for:
  - Integration into larger Python workflows
  - Fine-grained control over each processing step
  - Dynamic operator configuration at runtime
  - Debugging and step-by-step inspection

## Monitor Resources

In [17]:
# Check resource usage
import ray
from data_juicer.utils.ray_utils import ray_cpu_count, ray_gpu_count

ray.init(ignore_reinit_error=True)

print(f"Total CPUs: {ray_cpu_count()}")
print(f"Total GPUs: {ray_gpu_count()}")

2026-02-12 09:35:28,101	INFO worker.py:1821 -- Connecting to existing Ray cluster at address: 10.0.0.151:6379...
2026-02-12 09:35:28,101	INFO worker.py:1839 -- Calling ray.init() again after it has already been called.


Total CPUs: 4.0
Total GPUs: 0


## Ray Dashboard

Access Ray Dashboard at: `http://localhost:8265`

The dashboard provides:
- Real-time resource utilization
- Task execution timeline
- Memory usage statistics
- Error logs and debugging info

## Multi-Node Cluster Setup

In [18]:
print("Multi-node Ray cluster setup:")
print("""
# On head node:
ray start --head --port=6379 --num-cpus=8

# On worker nodes:
ray start --address='<head-node-ip>:6379' --num-cpus=8

# In Data-Juicer config:
executor_type: 'ray'
ray_address: '<head-node-ip>:6379'
""")

Multi-node Ray cluster setup:

# On head node:
ray start --head --port=6379 --num-cpus=8

# On worker nodes:
ray start --address='<head-node-ip>:6379' --num-cpus=8

# In Data-Juicer config:
executor_type: 'ray'
ray_address: '<head-node-ip>:6379'



## Try Deduplication Demo

Data-Juicer also provides a deduplication demo using Ray:

In [19]:
# View deduplication config
!cat demos/process_on_ray/configs/dedup.yaml

# Process config example for dataset

# global parameters
project_name: 'demo-dedup'
dataset_path: './demos/process_on_ray/data/'
export_path: './outputs/demo-dedup/demo-ray-bts-dedup-processed'

executor_type: 'ray'
ray_address: 'auto'

# process schedule
# a list of several process operators with their arguments
process:
  - ray_bts_minhash_deduplicator:
      tokenization: 'character'

In [20]:
# check input directory
!ls -lh demos/process_on_ray/data

total 8.0K
-rw-rw-rw- 1 vscode vscode 3.4K Feb 12 09:30 demo-dataset.json
-rw-rw-rw- 1 vscode vscode 3.4K Feb 12 09:30 demo-dataset.jsonl


In [21]:
# Run deduplication
!dj-process --config demos/process_on_ray/configs/dedup.yaml

[32m2026-02-12 09:35:47.492[0m | [1mINFO    [0m | [36mdata_juicer.config.config[0m:[36m695[0m - [1mdataset_path config is set and a valid local path[0m
2026-02-12 09:35:47,502	INFO worker.py:1821 -- Connecting to existing Ray cluster at address: 10.0.0.151:6379...
2026-02-12 09:35:47,517	INFO worker.py:1998 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[32m2026-02-12 09:35:47.570[0m | [1mINFO    [0m | [36mdata_juicer.config.config[0m:[36m1012[0m - [1mBack up the input config file [/workspaces/data-juicer-hub/data-juicer/demos/process_on_ray/configs/dedup.yaml] into the work_dir [/workspaces/data-juicer-hub/data-juicer/outputs/demo-dedup][0m
[32m2026-02-12 09:35:47.575[0m | [1mINFO    [0m | [36mdata_juicer.config.config[0m:[36m1033[0m - [1mConfiguration table: [0m
╒══════════════════════════╤════════════════════════════════════════════════════════════════════════════════════════════════════════════════╕
│ key  

In [22]:
# Check output directory
!ls -lh outputs/demo-dedup/demo-ray-bts-dedup-processed

total 16K
-rw-rw-rw- 1 vscode vscode  522 Feb 12 09:36 72_3506195790df42edb2e5de419f5e7bc6_000000_000000.json
-rw-rw-rw- 1 vscode vscode  783 Feb 12 09:36 72_3506195790df42edb2e5de419f5e7bc6_000001_000000.json
-rw-rw-rw- 1 vscode vscode  819 Feb 12 09:36 72_3506195790df42edb2e5de419f5e7bc6_000003_000000.json
-rw-rw-rw- 1 vscode vscode 1.3K Feb 12 09:36 72_3506195790df42edb2e5de419f5e7bc6_000006_000000.json


In [23]:
# View sample processed data
import os
import json
output_dir = 'outputs/demo-dedup/demo-ray-bts-dedup-processed'
try:
    sample_files = os.listdir(output_dir)
    print(f"Sample files count: {len(sample_files)}")
    for sample_file in sample_files:
        with open(os.path.join(output_dir, sample_file), 'r') as f:
            for i, line in enumerate(f):
                if i < 3:
                    print(json.dumps(json.loads(line), ensure_ascii=False))
except FileNotFoundError:
    print("Output directory not found")

Sample files count: 4
{"text": "What’s one thing you wish everyone knew about the brain?\nibble\nWhat’s one thing you wish everyone knew about the brain?\nThe place to have real conversations and understand each other better. Join a community or build and grow your own with groups, threads, and conversations.\nSee this content immediately after install\nGet The App\n"}
{"text": "JavaScript must be enabled to use the system\n"}
{"text": "中国企业又建成一座海外三峡工程!-科技-高清完整正版视频在线观看-优酷\n"}
{"text": "This paper proposed a novel method on LLM pretraining."}
{"text": "世界十大网投平台_2022年卡塔尔世界杯官网\n177-8228-4819\n网站首页\n关于我们\n产品展示\n广告牌制作 广告灯箱制作 标识牌制作 楼宇亮化工程 门头店招制作 不锈钢金属字制作 LED发光字制作 形象墙Logo墙背景墙制作 LED显示屏制作 装饰装潢工程 铜字铜牌制作 户外广告 亚克力制品 各类广告设计 建筑工地广告制作 楼顶大字制作|楼顶发光字制作 霓虹灯制作 三维扣板|3D扣板|广告扣板 房地产广告制作设计 精神堡垒|立牌|指示牌制作 大型商业喷绘写真 展览展示 印刷服务\n合作伙伴\n新闻资讯\n公司新闻 行业新闻 制作知识 设计知识\n成功案例\n技术园地\n联系方式\n"}
{"text": ".cv域名是因特网域名管理机构ICANN为佛得角共和国（The Republic of Cape Verde República de Cabo Verde）国家及地区分配的顶级域（ccTLD）,作为其国家及地区因特网顶级域名。- 奇典网络\n专业的互

## Performance Tips

Performance optimization tips for Ray processing:

1. **Shard Size**: Adjust export_shard_size based on dataset size
   - Smaller shards (100-1000): Better for fault tolerance
   - Larger shards (5000-10000): Better for throughput

2. **Caching**: Enable caching for repeated operations
   use_cache: true
   cache_compress: 'gzip'

3. **Operator Fusion**: Combine compatible operators
   op_fusion: true

4. **Resource Allocation**: Match workers to available resources
   - CPU-bound ops: More workers
   - GPU-bound ops: Fewer workers with GPU allocation

5. **Monitoring**: Use Ray Dashboard at http://localhost:8265

## Cleanup

In [24]:
# Stop Ray cluster
# !ray stop

In [None]:
# Remove cloned Data-Juicer repository
!rm -rf data-juicer

## Further Reading

- [Distributed Processing Documentation](https://datajuicer.github.io/data-juicer/en/main/docs/Distributed.html)
- [Ray Documentation](https://docs.ray.io/)