Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Step abstract class and new Pipeline #338

Merged
merged 8 commits into from
Feb 14, 2024
Merged

Conversation

gabrielmbmb
Copy link
Member

@gabrielmbmb gabrielmbmb commented Feb 8, 2024

Description

This PR includes the new Step abstract base class and the new Pipeline to build more complex distilabel pipelines defined as a DAG. This PR, just adds the needed logic to add and connect steps, as well as validating that the resulting pipeline is valid and can be executed.

Some more validations are still missing, like checking that non-optional runtime parameters have been provided in the pipeline run method, but I'll add those in a different PR, as this one is quite big already.

from typing import Any, Dict, Generator, List

from distilabel.pipeline.local import Pipeline
from distilabel.step.base import GeneratorStep, Step, StepInput


class LoadHubDataset(GeneratorStep):
    def process(
        self, repo_id: str, split: str
    ) -> Generator[List[Dict[str, Any]], None, None]:
        yield []

    @property
    def outputs(self) -> List[str]:
        return ["instruction"]


class GenerateResponse(Step):
    @property
    def inputs(self) -> List[str]:
        return ["instruction"]

    def process(self, inputs: StepInput) -> Generator[List[Dict[str, Any]], None, None]:
        yield []

    @property
    def outputs(self) -> List[str]:
        return ["response"]


class EvolResponse(Step):
    @property
    def inputs(self) -> List[str]:
        return ["response"]

    def process(
        self, *inputs: StepInput
    ) -> Generator[List[Dict[str, Any]], None, None]:
        yield []

    @property
    def outputs(self) -> List[str]:
        return ["evol_response"]


with Pipeline() as pipeline:
    load_dataset = LoadHubDataset(name="load_dataset")
    generate_response = GenerateResponse(name="generate_response")
    evol_response = EvolResponse(name="evol_response")

    load_dataset.connect(generate_response)
    generate_response.connect(evol_response)


pipeline.run(
    configuration={
        "load_dataset": {
            "repo_id": "HuggingFaceH4/instruction-dataset",
            "split": "test",
        }
    }
)

@gabrielmbmb gabrielmbmb added the enhancement New feature or request label Feb 8, 2024
@gabrielmbmb gabrielmbmb added this to the 1.0.0 milestone Feb 8, 2024
@gabrielmbmb gabrielmbmb self-assigned this Feb 8, 2024
@gabrielmbmb gabrielmbmb changed the title Add logic for adding steps, edges and validating DAG Add Step abstract class and new Pipeline Feb 14, 2024
@gabrielmbmb gabrielmbmb marked this pull request as ready for review February 14, 2024 14:48
@gabrielmbmb gabrielmbmb merged commit af3b557 into core-refactor Feb 14, 2024
4 checks passed
@gabrielmbmb gabrielmbmb deleted the add_pipeline_dag branch February 14, 2024 15:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant