# Simple ETL - Graflow Minimal Example
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/GraflowAI/graflow-examples/blob/main/examples/notebooks/simple_etl.ipynb)

A minimal Extract → Transform → Load workflow.

## Prerequisites

Install Graflow and import the core modules:
- **`@task`** — Decorator to define a workflow task
- **`workflow`** — Context manager to build and execute a task graph
- **`TaskExecutionContext`** — Provides access to the shared channel for inter-task communication

In [None]:
!apt-get -qq install -y libgraphviz-dev

In [None]:
!pip install -q -U graflow[visualization]

In [1]:
from graflow.core.context import TaskExecutionContext
from graflow.core.decorators import task
from graflow.core.workflow import workflow

## Task Definitions

Define four tasks with parallel transforms using `>>` (sequential) and `|` (parallel) operators:

```
extract >> (filter_pass | assign_grade) >> load
```

Each task uses `inject_context=True` to access the shared **channel** — a key-value store for passing data between tasks. The two transform tasks run concurrently and both write their results to the channel before `load` collects them.

In [None]:
@task(inject_context=True)
def extract(ctx: TaskExecutionContext):
    """Load raw data."""
    raw_data = [
        {"name": "Alice", "score": 85},
        {"name": "Bob", "score": 42},
        {"name": "Charlie", "score": 73},
        {"name": "Diana", "score": 91},
    ]
    print(f"[Extract] {len(raw_data)} records loaded")
    ctx.get_channel().set("raw_data", raw_data)


@task(inject_context=True)
def filter_pass(ctx: TaskExecutionContext):
    """Filter records with score >= 50."""
    channel = ctx.get_channel()
    raw = channel.get("raw_data")
    passed = [r for r in raw if r["score"] >= 50]
    print(f"[Filter] {len(raw)} -> {len(passed)} records passed")
    channel.set("passed", passed)


@task(inject_context=True)
def assign_grade(ctx: TaskExecutionContext):
    """Assign a grade to each record."""
    channel = ctx.get_channel()
    raw = channel.get("raw_data")
    graded = []
    for r in raw:
        grade = "A" if r["score"] >= 90 else "B" if r["score"] >= 70 else "C" if r["score"] >= 50 else "F"
        graded.append({**r, "grade": grade})
    print(f"[Grade] {len(graded)} records graded")
    channel.set("graded", graded)


@task(inject_context=True)
def load(ctx: TaskExecutionContext):
    """Merge and output the results."""
    channel = ctx.get_channel()
    passed = {r["name"] for r in channel.get("passed")}
    graded = channel.get("graded")
    results = [r for r in graded if r["name"] in passed]
    print("[Load] Results:")
    for r in results:
        print(f"  {r['name']}: {r['score']} ({r['grade']})")

## Run Workflow

Use the `workflow()` context manager to register the task graph and execute it. The `>>` operator wires tasks sequentially, `|` runs tasks in parallel, and `wf.execute()` starts execution from the entry task.

In [None]:
with workflow("hello_etl") as wf:
    _ = extract >> (filter_pass | assign_grade).set_group_name("transforms") >> load
    wf.execute("extract")

## Visualization

`print(wf.graph)` renders an ASCII diagram via `TaskGraph.__str__`. `draw_mermaid_png` renders the graph as a PNG image via the Mermaid.INK API.

In [None]:
print("Task Graph (ASCII):")
print("-" * 70)
print(wf.graph)

In [None]:
from IPython.display import Image, display
from graflow.utils.graph import draw_mermaid_png

png_bytes = draw_mermaid_png(wf.graph.nx_graph(), title="Simple ETL Pipeline")
display(Image(png_bytes))