Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs pipeline #512

Merged
merged 9 commits into from
Apr 9, 2024
16 changes: 15 additions & 1 deletion docs/sections/learn/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ The pipeline information includes the steps used in the `Pipeline` along with th

### Running a Pipeline

We can also run a `Pipeline` from the CLI just pointint to the same `pipeline.yaml` file calling `distilabel pipeline run`:
We can also run a `Pipeline` from the CLI just pointing to the same `pipeline.yaml` file or an URL pointing to it and calling `distilabel pipeline run`:

```bash
$ distilabel pipeline run --help
Expand Down Expand Up @@ -90,4 +90,18 @@ $ distilabel pipeline run --help
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
```

To specify the runtime parameters of the steps we will need to use the `--param` option and the value of the parameter in the following format:

```bash
distilabel pipeline run --config "https://huggingface.co/datasets/distilabel-internal-testing/ultrafeedback-mini/raw/main/pipeline.yaml" \
--param load_dataset.repo_id=HuggingFaceH4/instruction-dataset \
--param load_dataset.split=test \
--param text_generation_with_notus.generation_kwargs.max_new_tokens=512 \
--param text_generation_with_notus.generation_kwargs.temperature=0.7 \
--param text_generation_with_zephyr.generation_kwargs.max_new_tokens=512 \
--param text_generation_with_zephyr.generation_kwargs.temperature=0.7 \
--param ultrafeedback_overall_rating.generation_kwargs.max_new_tokens=1024 \
--param ultrafeedback_overall_rating.generation_kwargs.temperature=0.7
```

Again, this helps with the reproducibility of the results, and simplifies sharing not only the final dataset but also the process to generate it.
321 changes: 321 additions & 0 deletions docs/sections/learn/pipelines/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
# Pipeline

The [`Pipeline`][distilabel.pipeline.local.Pipeline] is the central point in `distilabel`, the way to organize the steps to create your datasets. Up to this point we've seen how we can define different [`Steps`](../steps/index.md) and [`Tasks`](../tasks/index.md), that together with an [`LLM`](.) are the building blocks of our datasets, in this section we will take a look at how all these blocks are organized inside a `Pipeline`.

!!! Note
Currently `distilabel` implements a *local* version of a `Pipeline`, and will assume that's the only definition, but this can be extended in the future to include remote execution of the `Pipeline`.

## How to create a pipeline

The most common way a `Pipeline` should be created is by making use of the context manager, we just need to give our `Pipeline` a **name**, and optionally a **description**, and that's it[^1]:

```python
from distilabel.pipeline import Pipeline

with Pipeline("pipe-name", description="My first pipe") as pipeline: # (1)
...

```

1. Use the context manager to create a `Pipeline` with a name and an optional description.

This way, we ensure all the steps we define there are connected with each other under the same `Pipeline`. The next step is to define the steps of our `Pipeline`. It's mandatory that the root steps of the pipeline i.e. the ones that doesn't have any predecessors, are `GeneratorStep`s such as [`LoadDataFromDicts`](/distilabel/api/steps/generator_steps/generator_steps/#distilabel.steps.generators.data.LoadDataFromDicts) or [`LoadHubDataset`](/distilabel/api/steps/generator_steps/generator_steps/#distilabel.steps.generators.huggingface.LoadHubDataset).

```python
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadHubDataset

with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(name="load_dataset") # (1)
...

```

1. Define the first step of the pipeline, in this case `LoadHubDataset`, a `GeneratorStep` used to load a dataset from the Hugging Face Hub.

Once we have a source of data, we can create another `Step`s that will consume and process the data generated by the `GeneratorStep`s. Let's assume that the dataset we're going to load from the Hub contains a `prompt` column and that we want to generate texts based on this prompt. We also want to use several `LLM`s for this task. To do so, we will create several `TextGeneration` tasks, each with a different `LLM`.

```python
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(name="load_dataset")

for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm) # (1)
task.connect(load_dataset) # (2)

...
```

1. Create a `TextGeneration` task for each `LLM` we want to use.
2. Connect the `TextGeneration` task with the `LoadHubDataset` step, so the output data from the dataset is passed to the task.

!!! NOTE
The order of the execution of the steps will be determined by the connections of the steps. In this case, the `TextGeneration` tasks will be executed after the `LoadHubDataset` step.

For each row of the dataset, the `TextGeneration` task will generate a text based on the `instruction` column and the `LLM` model, and store the result (a single string) in a new column called `generation`. As we would like to have all the `response`s in the same column, we will add an extra step to combine them all in the same column, so the value of this column is a list of strings or responses.

```python
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(name="load_dataset")

combine_generations = CombineColumns( # (1)
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)

for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
load_dataset.connect(task)
task.connect(combine_generations) # (2)
```

1. Create a `CombineColumns` step to combine all the `generation` columns into a single column called `generations` and the `model_name` columns into a single column called `model_names`.
2. Connect the `TextGeneration` task with the `CombineColumns` step, so the output data from the task is passed to the step that will combine all the `generation` columns.

As the `CombineColumns` is the last step or it's a leaf step of the pipeline because it doesn't have any successors, that means that the outputs of this step will be included in the returned [`Distiset`](/distilabel/sections/learn/distiset) by the pipeline.

!!! NOTE
One pipeline can have several leaf steps, which means that the outputs of all the leaf steps will be included in the returned `Distiset`, which will contain several subsets, one for each leaf step.

## Creating the steps of our Pipeline


TODO: WRITE THE WARNING WHEN WE NEED TO CALL THE RUN METHOD

!!! Warning
Due to `multiprocessing`, the `pipeline.run` method **must** be run inside `__main__`:

```python
with Pipeline("pipeline") as pipe:
...

if __name__ == "__main__":
distiset = pipe.run()
```

Otherwise an `EOFError` will raise.

## Running the pipeline

Once we have created the pipeline, we can run it. To do so, we need to call the `run` method of the `Pipeline`, and specify the runtime parameters for each step:

```python
with Pipeline("pipe-name", description="My first pipe") as pipeline:
...

if __name__ == "__main__":
distiset = pipeline.run(
parameters={
"load_dataset": {
"repo_id": "distilabel-internal-testing/instruction-dataset-mini",
"split": "test",
},
"text_generation_with_openai-gpt-4-0125-preview": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
},
"text_generation_with_mistral-large-2402": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
},
"text_generation_with_vertexai-gemini-1.5-pro": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
},
}
)
```

But if we run it, we will see that the `run` method will fail:

```
ValueError: Step 'text_generation_with_gpt-4-0125-preview' requires inputs ['instruction'] which are not available when the step gets to be executed in the pipeline. Please make sure previous steps to 'text_generation_with_gpt-4-0125-preview' are
generating the required inputs. Available inputs are: ['prompt', 'completion', 'meta']
```

This is because, before actually running the pipeline, the pipeline is validated to verify that everything is correct and all the steps in the pipeline are chainable i.e. each step has the necessary inputs to be executed. In this case, the `TextGeneration` task requires the `instruction` column, but the `LoadHubDataset` step generates the `prompt` column. To solve this, we can use the `output_mappings` argument that every `Step` has, to map or rename the output columns of a step to the required input columns of another step:

```python
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
name="load_dataset",
output_mappings={"prompt": "instruction"}, # (1)
)

...
```

1. Use the `output_mappings` argument to map the `prompt` column generated by the `LoadHubDataset` step to the `instruction` column required by the `TextGeneration` task.

If we execute the pipeline again, it will run successfully and we will have a `Distiset` with the outputs of all the leaf steps of the pipeline which we can push to the Hugging Face Hub.

```python
if __name__ == "__main__":
distiset = pipeline.run(...)
distiset.push_to_hub("distilabel-internal-testing/instruction-dataset-mini-with-generations")
```

## Cache

If we try to execute the pipeline again, the pipeline won't execute as it will load the dataset from the cache, and the outputs of the pipeline will be the same as the previous run. If for some reason, we decide to stop the pipeline execution in the middle of the process pressing CTRL + C, the pipeline will stop and the state of the pipeline and the outputs will be stored in the cache, so we can resume the pipeline execution from the point where it was stopped.

If we want to force the pipeline to run again, then we can use the `use_cache` argument of the `run` method and set it to `False`:

```python
if __name__ == "__main__":
distiset = pipeline.run(parameters={...}, use_cache=False)
```

## Adjusting the batch size for each step

It's very likely that in some pipelines the batch size of the steps (the number of dictionaries that will receive every `Step.process` method when called) will need to be adjusted in order to avoid memory issues or a more efficient processing. To do so, we can use the `input_batch_size` argument of the `run` method:

```python
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
...

for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(
name=f"text_generation_with_{llm.model_name}",
llm=llm,
input_batch_size=5, # (1)
)

...
```

1. Use the `input_batch_size` argument to set the batch size of the `TextGeneration` task to 5.

When we run the pipeline, the `TextGeneration` task will receive 5 dictionaries in every call to the `process` method. In addition, we can also adjust the batch size of the generated batches by the `GeneratorStep`s using the `batch_size` argument:

```python
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
name="load_dataset",
output_mappings={"prompt": "instruction"},
batch_size=10 # (1)
)

...
```

1. Use the `batch_size` argument to set the batch size of the `LoadHubDataset` step to 10.

By default, both arguments have a value of `50`.

## Serializing the pipeline

Sharing a pipeline with others is very easy, as we can serialize the pipeline object using the `save` method. We can save the pipeline in different formats, such as `yaml` or `json`:

```python
if __name__ == "__main__":
pipeline.save("pipeline.yaml", format="yaml")
```

To load the pipeline, we can use the `from_yaml` or `from_json` methods:

```python
pipeline = Pipeline.from_yaml("pipeline.yaml")
```

Serializing the pipeline is very useful when we want to share the pipeline with others, or when we want to store the pipeline for future use. It can even be hosted online, so the pipeline can be executed directly using the [CLI](/distilabel/sections/learn/cli) knowing the URL of the pipeline.

## Fully working example

To sump up, here is the full code of the pipeline we have created in this section. Note that you will need to change the name of the Hugging Face repository where the resulting will be pushed, set `OPENAI_API_KEY` environment variable, set `MISTRAL_API_KEY` and have `gcloud` installed and configured:

??? Code
```python
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
name="load_dataset",
output_mappings={"prompt": "instruction"},
)

combine_generations = CombineColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)

for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.0-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
load_dataset.connect(task)
task.connect(combine_generations)

if __name__ == "__main__":
distiset = pipeline.run(
parameters={
"load_dataset": {
"repo_id": "distilabel-internal-testing/instruction-dataset-mini",
"split": "test",
},
"text_generation_with_gpt-4-0125-preview": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
},
"text_generation_with_mistral-large-2402": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
},
"text_generation_with_gemini-1.0-pro": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
},
},
)
distiset.push_to_hub(
"distilabel-internal-testing/instruction-dataset-mini-with-generations"
)
```

[^1]: We also have the *cache_dir* argument to pass, for more information on this parameter, we refer the reader to the [caching](../caching.md) section.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ nav:
- LLMs:
- "sections/learn/llms/index.md"
- Pipeline:
- "sections/learn/pipelines/index.md"
- "sections/learn/pipelines/base.md"
- "sections/learn/pipelines/local.md"
- Command Line Interface:
Expand Down
Loading