# Pipeline 

> Pipeline class using Notebooks as steps with StepRunner. see: [StepRunner]() 

In [None]:
#| default_exp pipeline 

In [None]:
#| export

from __future__ import annotations

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export

import logging
from typing import List

from colorama import Style
from tqdm.notebook import tqdm

from stdflow import StepRunner
from stdflow.stdflow_utils.bt_print import print_header


In [None]:
#| export

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)


In [None]:
#| export

class Pipeline:
    "Create pipeline of notebooks with optional variables"
    def __init__(self, steps: List[StepRunner] | StepRunner = None, *args):
        steps = [steps] if isinstance(steps, StepRunner) else steps or []
        steps += list(args) if args else []
        self.steps: List[StepRunner] = steps or []

    def verify(self) -> bool:
        "Verify that all steps are valid"
        is_valid = True
        for step in self.steps:
            is_valid = is_valid and step.is_valid()
        return is_valid

    def add_step(
            self, 
            step: StepRunner | str = None, # StepRunner or path to notebook
            **kwargs # kwargs to pass to StepRunner
    ):
        "Add step to pipeline"
        if isinstance(step, str):
            kwargs["file_path"] = step
            step = StepRunner(**kwargs)
        self.steps.append(step)
        return self

    def run(
            self, 
            progress_bar: bool = False,  # Whether to show progress bar
            **kwargs  # kwargs to pass to StepRunner.run
    ):
        "Run pipeline"
        longest_worker_path_adjusted = max([len(step.worker_path) for step in self.steps])
        min_blank = 10

        it = enumerate(self.steps)
        if progress_bar:
            try:
                it = tqdm(enumerate(self.steps), desc="Pipeline")
            except ImportError as e:
                logger.warning(f"Could not use tqdm. {e.msg}")
                progress_bar = False

        for i, step in it:
            if progress_bar:
                it.desc = f"Pipeline: {step.worker_path}"

            text = step.worker_path
            print_header(text, i, longest_worker_path_adjusted, min_blank)
            print(f"Variables: {step.env_vars}")
            # Run step
            step.run(verbose=False, **kwargs)

            print("", end="\n\n")

    def __call__(
            self,
            progress_bar: bool = False,  # Whether to show progress bar
            **kwargs  # kwargs to pass to StepRunner.run
    ):
        "Run pipeline"
        self.run(
            progress_bar=progress_bar,
            **kwargs
        )

    def __str__(self):
        s = (
            Style.BRIGHT
            + """
================================
            PIPELINE            
================================

"""
            + Style.RESET_ALL
        )

        for i, step in enumerate(self.steps):
            s += f"""{Style.BRIGHT}STEP {i+1}{Style.RESET_ALL}
\tpath: {step.worker_path}
\tvars: {step.env_vars}

"""
        s += f"""{Style.BRIGHT}================================{Style.RESET_ALL}\n"""
        return s

    def __repr__(self):
        return str(self)


In [None]:
show_doc(Pipeline.add_step)

---

[source](https://github.com/CyprienRicque/stdflow/blob/main/stdflow/pipeline.py#L41){target="_blank" style="float:right; font-size:smaller"}

### Pipeline.add_step

>      Pipeline.add_step (step:Union[stdflow.step_runner.StepRunner,str]=None,
>                         **kwargs)

Add step to pipeline

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| step | StepRunner \| str | None | StepRunner or path to notebook |
| kwargs |  |  |  |

In [None]:
show_doc(Pipeline.verify)

---

[source](https://github.com/CyprienRicque/stdflow/blob/main/stdflow/pipeline.py#L34){target="_blank" style="float:right; font-size:smaller"}

### Pipeline.verify

>      Pipeline.verify ()

Verify that all steps are valid

In [None]:
show_doc(Pipeline.run)

---

[source](https://github.com/CyprienRicque/stdflow/blob/main/stdflow/pipeline.py#L53){target="_blank" style="float:right; font-size:smaller"}

### Pipeline.run

>      Pipeline.run (progress_bar:bool=False, **kwargs)

Run pipeline

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| progress_bar | bool | False | Whether to show progress bar |
| kwargs |  |  |  |

In [None]:
show_doc(Pipeline.__call__)

---

[source](https://github.com/CyprienRicque/stdflow/blob/main/stdflow/pipeline.py#L82){target="_blank" style="float:right; font-size:smaller"}

### Pipeline.__call__

>      Pipeline.__call__ (progress_bar:bool=False, **kwargs)

Run pipeline

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| progress_bar | bool | False | Whether to show progress bar |
| kwargs |  |  |  |

In [None]:
#| hide
    
ppl = Pipeline()
ppl.add_step(step="../demo_project/notebooks/_00_experiments/check_hello.ipynb", variables={"hello": "world!"})
ppl.run()


[1m    01.          ../demo_project/notebooks/_00_experiments/check_hello.ipynb[0m
Variables: {'hello': 'world!'}
	Path: check_hello.ipynb
	Duration: 0 days 00:00:00.542960
	Env: {'hello': 'world!'}
[1m[32mNotebook executed successfully.[0m




In [None]:
#| hide

from itertools import product

countries = ["india", "indonesia"]
targets = ["meta_impressions", "yt_impressions"]

files = [
    "1_feature_eng_platform_focus.ipynb",
    "2_feature_eng_blanket.ipynb",
    "3_base_feature_selection.ipynb",
    # "4_feature_eng_linear_transformation.ipynb",
    # "5_feature_selection.ipynb",
    # "6_manual_feature_selection.ipynb",
    # "7_lrl_comp_split.ipynb",
]

run_with_countries = files
run_with_targets = [
    "3_base_feature_selection.ipynb",
    # "4_feature_eng_linear_transformation.ipynb",
    # "5_feature_selection.ipynb",
    # "6_manual_feature_selection.ipynb",
]

ppl = Pipeline()



In [None]:
#| hide


for file in files:
    l = []
    l += [countries] if file in run_with_countries else [[None]]
    l += [targets] if file in run_with_targets else [[None]]
    for country, target in product(*l):
        env = {"country": country}
        if target:
            env["target"] = target
        ppl.add_step(StepRunner(file, variables=env))

ppl

[1m
            PIPELINE            

[0m[1mSTEP 1[0m
	path: ./1_feature_eng_platform_focus.ipynb
	vars: {'country': 'india'}

[1mSTEP 2[0m
	path: ./1_feature_eng_platform_focus.ipynb
	vars: {'country': 'indonesia'}

[1mSTEP 3[0m
	path: ./2_feature_eng_blanket.ipynb
	vars: {'country': 'india'}

[1mSTEP 4[0m
	path: ./2_feature_eng_blanket.ipynb
	vars: {'country': 'indonesia'}

[1mSTEP 5[0m
	path: ./3_base_feature_selection.ipynb
	vars: {'country': 'india', 'target': 'meta_impressions'}

[1mSTEP 6[0m
	path: ./3_base_feature_selection.ipynb
	vars: {'country': 'india', 'target': 'yt_impressions'}

[1mSTEP 7[0m
	path: ./3_base_feature_selection.ipynb
	vars: {'country': 'indonesia', 'target': 'meta_impressions'}

[1mSTEP 8[0m
	path: ./3_base_feature_selection.ipynb
	vars: {'country': 'indonesia', 'target': 'yt_impressions'}


In [None]:
#| hide

print(ppl)


[1m
            PIPELINE            

[0m[1mSTEP 1[0m
	path: ./1_feature_eng_platform_focus.ipynb
	vars: {'country': 'india'}

[1mSTEP 2[0m
	path: ./1_feature_eng_platform_focus.ipynb
	vars: {'country': 'indonesia'}

[1mSTEP 3[0m
	path: ./2_feature_eng_blanket.ipynb
	vars: {'country': 'india'}

[1mSTEP 4[0m
	path: ./2_feature_eng_blanket.ipynb
	vars: {'country': 'indonesia'}

[1mSTEP 5[0m
	path: ./3_base_feature_selection.ipynb
	vars: {'country': 'india', 'target': 'meta_impressions'}

[1mSTEP 6[0m
	path: ./3_base_feature_selection.ipynb
	vars: {'country': 'india', 'target': 'yt_impressions'}

[1mSTEP 7[0m
	path: ./3_base_feature_selection.ipynb
	vars: {'country': 'indonesia', 'target': 'meta_impressions'}

[1mSTEP 8[0m
	path: ./3_base_feature_selection.ipynb
	vars: {'country': 'indonesia', 'target': 'yt_impressions'}




In [None]:
#| hide


In [None]:
#| hide

In [None]:
#| hide


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()