In [1]:
def pipeline():
    registered_steps = []
    steps_dict = {}
    dependencies = {} 

    def step(func=None, *, depends_on=None):
        if func is None:
            def decorator(f):
                return step(f, depends_on=depends_on)
            return decorator
        func_name = func.__name__
        registered_steps.append(func_name)
        steps_dict[func_name] = func
        dependencies[func_name] = depends_on or []

        def get_dependencies():
            return dependencies[func_name]
        func.get_dependencies = get_dependencies

        from functools import wraps
        @wraps(func)
        def wrapper_function():
            executed = set()
            def execute_step(step_name):
                if step_name not in executed:
                    for dep in dependencies.get(step_name, []):
                        execute_step(dep)
                    steps_dict[step_name]()
                    executed.add(step_name)
            execute_step(func_name)
        wrapper_function.get_dependencies = func.get_dependencies
        return wrapper_function

    def get_all():
        return registered_steps.copy()
    step.get_all = get_all

    from graphviz import Digraph
    def graph():
        dot = Digraph(comment='Pipeline Graph')
        for step_name in registered_steps:
            dot.node(step_name, step_name)
        for step_name in registered_steps:
            for dep in dependencies[step_name]:
                dot.edge(dep, step_name)
        return dot
    step.graph = graph

    return step

step = pipeline()

@step
def collect_data():
    print("collect data")

@step(depends_on=["collect_data"])
def preprocess_data():
    print("preprocess data")

@step(depends_on=["collect_data"])
def modify_data():
    print("modify data")

@step(depends_on=["modify_data"])
def extract_features():
    print("extract features")

@step(depends_on=["extract_features"])
def filter_features():
    print("filter features")

@step(depends_on=["preprocess_data", "modify_data"])
def merge_data():
    print("merge data")

@step(depends_on=["merge_data", "filter_features"])
def handle_data():
    print("handle data")

print(step.get_all())

print(collect_data.get_dependencies())

print(preprocess_data.get_dependencies())

filter_features()

handle_data()

dot = step.graph()
dot.render('pipeline_graph', view=True) 


['collect_data', 'preprocess_data', 'modify_data', 'extract_features', 'filter_features', 'merge_data', 'handle_data']
[]
['collect_data']
collect data
modify data
extract features
filter features
collect data
preprocess data
modify data
merge data
extract features
filter features
handle data


ExecutableNotFound: failed to execute WindowsPath('dot'), make sure the Graphviz executables are on your systems' PATH