# Creating a new pipeline from scratch

## Overview

In algo features, all pipelines are built using the `pipelines` library and the `actions` SDK.

A `pipeline` consists of 3 components:
1. The `pipeline` name.
2. `Tasks` that the pipeline will run.
3. An optional `trigger` for the pipeline.

## Creating a pipeline

To create a pipeline, import and initialize the pipeline object from the pipelines library as seen below.

Once a pipeline has been created, you can show it using `.show()` and run it using `.run()`. In this case our pipeline has no tasks, so there is nothing to show or run.

In [1]:
from src.pipelines import pipeline

example_pipeline = pipeline.Pipeline(name='example_pipeline')


In [2]:
# Nothing will show because we have no tasks in our pipeline
example_pipeline.show()

Nothing to show! No tasks have been added to the pipeline.
	Try adding tasks to the pipeline using pipeline.add_task()


In [3]:
# Nothing will run because we have no tasks in our pipeline
example_pipeline.run()

## Adding tasks to a pipeline

A `task` in a pipeline consists of 5 components:
* name: the name of the task
* action: the action the task runs (see below)
* parameters: the parameters the action requires
* depends_on: list of dependent tasks
* src: path to the src code for the action

Of these 5 components, when using the `pipeline` object, you only have to provide __name, action, and parameters__.

To create a `task`, we will first need to create an `action`.

## Defining a new action

In algo features, `actions` are just python functions. 

To define a new action, create a python function in any module. 

Here we will define the function in this notebook and then write it to a module. Typically, you would import a function from an existing module.

Defining actions in notebooks is not currently supported.

In [4]:
# example action
def hello(name: str) -> None:
    print(f'hello {name}!')

## Using actions in tasks

Suppose we want to build a pipeline that runs the `action` `hello` with the input `name='Peacock'` and afterwards run the `action` `hello` again with the input `name='World'`.

In a script, it may look something like this:

In [5]:
from src.actions import actions

actions.hello(name='Peacock')
actions.hello(name='World')




hello Peacock!
hello World!


To add the first `action` to our `pipeline`, we convert it to a `task` using the `add_task()` method. 

In [6]:
example_pipeline = pipeline.Pipeline(name='example_pipeline')

example_pipeline.add_task(
    name='hello_peacock', action=actions.hello, parameters={'name': 'Peacock'}
)

# When we show our pipeline, it now has one node.
example_pipeline.show()


In [7]:
# When we run our pipeline, it says "hello Peacock!"
example_pipeline.run()

hello Peacock!


To add the second `task` to our `pipeline`, we use the same method but include the `after='hello_peacock'` parameter to set this `task` after our previous task. 

__Note:__ there is also a parameter for `before` if you want to add a `task` before another.

In [8]:
example_pipeline = pipeline.Pipeline(name='example_pipeline')

example_pipeline.add_task(
    name='hello_peacock', action=actions.hello, parameters={'name': 'Peacock'}
)
example_pipeline.add_task(
    name='hello_world',
    action=actions.hello,
    parameters={'name': 'World'},
    after='hello_peacock',
)

# When we show our pipeline, it now has two nodes.
example_pipeline.show()

In [9]:
example_pipeline.run()

hello Peacock!
hello World!


## Compiling your pipeline

The true value in the `pipelines` library comes from the ability to compile `pipelines` to different languages or platform specific code.

To see all adapters we currently support, see `src.pipelines.adapters.Adapters`.

For example, we store all `pipelines` in algo features in yaml. To compile a `pipeline` to yaml, you can use the `YAML` compiler.

### Compiling to yaml

In [10]:
from src.pipelines import adapters

yaml = example_pipeline.compile(adapter=adapters.Adapters.YAML)
print(yaml)

port: pipeline
name: example_pipeline
trigger: null
tasks:
- name: hello_peacock
  src: /Users/a206691930/Documents/GitHub/src-algo-features/src/actions
  action: hello
  parameters:
    name: Peacock
  depends_on: []
- name: hello_world
  src: /Users/a206691930/Documents/GitHub/src-algo-features/src/actions
  action: hello
  parameters:
    name: World
  depends_on:
  - hello_peacock



### Decompiling from yaml
Our yaml compiler also supports decompiling back into a pipeline.


In [11]:
decompiled_example = pipeline.Pipeline.decompile(artifact=yaml, adapter=adapters.Adapters.YAML)
decompiled_example.show()

In [12]:
decompiled_example.run()

hello Peacock!
hello World!


In [13]:
example_pipeline == decompiled_example

True

## Add a schedule to the pipeline

You can add a cron schedule to your `pipeline` through adding a `trigger`

In [14]:
from src.pipelines import trigger

example_pipeline.trigger = trigger.CronTrigger(schedule='0 12 * * *', start_date='2024-01-01')

## Compile to Airflow DAG

You can also compile to many other platform specific code such as airflow, databricks, and mermaid.

In [15]:
dag = example_pipeline.compile(adapter=adapters.Adapters.AIRFLOW)
print(dag)

import functools
import json
from datetime import timedelta
from typing import Any

from airflow import models
from airflow.models import variable
from airflow.providers.google.cloud.operators import kubernetes_engine
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.utils import task_group
from dateutil import parser
import shlex


SLACK_WEBHOOK = SlackWebhookHook(slack_webhook_conn_id="slack_connection_id")
EMOJI_MAP: dict[str, str] = {
    "failed": "red-siren",
    "success": "check-green",
}



def get_slack_alert_from_context(context: dict[str, models.TaskInstance]):
    task_instance: models.TaskInstance = context['task_instance']
    state: str = task_instance.state or ''
    dag_id: str = task_instance.dag_id
    log_url: str = task_instance.log_url
    emoji: str = EMOJI_MAP[state]
    SLACK_WEBHOOK.send(
        text=f":{emoji}: {dag_id} *{state.replace('_', ' ')}!*\nLogs: {log_url}",
        blocks=[
            {
                'type': 

## Compile to Mermaid

In [16]:
mermaid = example_pipeline.compile(adapter=adapters.Adapters.MERMAID)
print(mermaid)

---
title: example_pipeline
---
graph LR
hello_peacock ---> hello_world



## Saving and loading your pipeline to and from algo features

When your pipeline is ready to go, you can save it in algo features using the `to_config()` method.

You can use the inverse of this method, `from_config(name='example_pipeline')` to load any existing configs for running or editing.

In [17]:
example_pipeline.to_config()

In [18]:
example_pipeline_from_config = pipeline.Pipeline.from_config(name='example_pipeline')
example_pipeline_from_config.show()

In [19]:
example_pipeline_from_config.run()

hello Peacock!
hello World!


In [20]:
example_pipeline_from_config == example_pipeline

True