# plumbum Tutorial

This notebook walks through the core ideas behind **plumbum** pipelines.
It covers synchronous operators, iterable helpers, and the async variants.


## Getting Started

Operators are regular Python callables decorated with `@pb`. Data is threaded
into the first argument using the `>` operator.


In [None]:
from pdum.plumbum import pb
from pdum.plumbum.iterops import tee


@pb
def add(value: int, amount: int) -> int:
    return value + amount


@pb
def multiply(value: int, factor: int) -> int:
    return value * factor
@pb
def log(value):
    print(value)
    return value


### Building Pipelines


In [None]:
pipeline = add(3) | multiply(2)
5 > pipeline

### Wrapping Existing Callables

Use `pb()` directly when you need a one-off callable inside a pipeline. It keeps the operator inertia without forcing you to decorate everything in advance.  This is only neeed at the start of the pipeline.  After that, callables are converted automatically. 



In [None]:
5 > pb(print)  # emits 5
'hello' > pb(str.upper) | (lambda s: s + '!') | print


### Keyword Arguments and Incremental Binding

Operators accept keyword arguments just like the underlying function, and you can partially apply additional arguments in later calls.



In [None]:
@pb
def greet(name: str, greeting: str = 'Hello', punctuation: str = '!') -> str:
    return f'{greeting}, {name}{punctuation}'

'Alice' > greet(greeting='Hi')
'Diana' > greet('Greetings', punctuation='...')


### Debugging Pipelines

Mix `tee` or other side-effectful callables when you need to peek at intermediate states without tearing apart the pipeline.



In [None]:
debug_pipeline = add(5) | log | multiply(2) | log
result = 10 > debug_pipeline
result


### Working with Arbitrary Types

Because plumbum only manages call signatures, any Python object can flow through a pipeline.



In [None]:
class Point:
    def __init__(self, x: int, y: int) -> None:
        self.x = x
        self.y = y

    def __repr__(self) -> str:
        return f'Point({self.x}, {self.y})'

@pb
def translate(point: Point, dx: int, dy: int) -> Point:
    return Point(point.x + dx, point.y + dy)

Point(1, 2) > translate(5, 3)
{'a': 1} > pb(lambda mapping: {**mapping, 'b': 2})


## Iterable Helpers

`pdum.plumbum.iterops` exposes ready-made operators for working with
iterables. They compose like normal operators.


In [None]:
list(numbers > pipeline)


In [None]:
list(numbers > batched(2))


## Inspecting Pipelines

Use `tee` to observe intermediate values without breaking the flow.


In [None]:
numbers = [1, 2, 3, 4, 5]
list(numbers > select(lambda value: value + 1) | tee | where(lambda value: value % 2 == 0))


## Async Pipelines

Async operators use the `@apb` decorator. You can await the threaded
result directly from a notebook cell.


## jq-like Operators

The `pdum.plumbum.jq` module adds jq-inspired path querying and transformation helpers on top of regular pipelines.



In [None]:
from pdum.plumbum import pb
from pdum.plumbum.jq import field, transform, group_by
from pdum.plumbum.iterops import select, where

records = [
    {'user': {'id': 1, 'name': 'Ada'}, 'scores': [10, 15]},
    {'user': {'id': 2, 'name': 'Linus'}, 'scores': [20]},
]

names = list(records > select(field(user.name)))
curved = records > transform('scores[]', lambda score: score * 1.1)
high_scorers = list(records > where(lambda row: max(row['scores']) >= 15) | group_by('user.id'))

names, curved, [(key, [row['user']['name'] for row in group]) for key, group in high_scorers]


Async counterparts (prefixed with `a`) are available for async iterators. The call below promotes the whole pipeline to async.



In [None]:
from pdum.plumbum.jq import aexplode, agroup_by
from pdum.plumbum.aiterops import aiter, alist

async def collect_async_records():
    async_records = [
        {'users': [{'id': 1}, {'id': 2}]},
        {'users': [{'id': 2}, {'id': 3}]},
    ]
    pipeline = aiter | aexplode('users') | agroup_by('id')
    grouped = await (async_records > pipeline)
    return grouped

await collect_async_records()


## Advanced Examples



### Data Processing Pipeline



In [None]:
@pb
def filter_positive(numbers):
    for number in numbers:
        if number > 0:
            yield number

@pb
def square_all(numbers):
    for number in numbers:
        yield number ** 2

process = filter_positive | square_all | sum
[-2, 3, -1, 4, 5] > process, [-10, 2, -5, 6] > process


### String Processing



In [None]:
@pb
def strip(text: str) -> str:
    return text.strip()

@pb
def uppercase(text: str) -> str:
    return text.upper()

@pb
def replace(text: str, old: str, new: str) -> str:
    return text.replace(old, new)

clean_text = strip() | replace(' ', '_') | uppercase()
'  hello world  ' > clean_text


### Chaining with Built-ins



In [None]:
pipeline = pb(str.strip) | pb(str.upper) | pb(print)
'  test  ' > pipeline


In [None]:
from pdum.plumbum import apb


@apb
async def async_double(value: int) -> int:
    return value * 2


await (5 > (async_double | add(3)))

## Async Iterable Helpers

The `pdum.plumbum.aiterops` module mirrors the iterable helpers with async
            variants prefixed by `a`.


In [None]:
from pdum.plumbum.aiterops import aiter, aselect, awhere


async def collect_async_values():
    pipeline = aiter | aselect(lambda value: value + 1) | awhere(lambda value: value % 2 == 0)
    iterator = await ([1, 2, 3, 4] > pipeline)
    return [item async for item in iterator]


await collect_async_values()

Async helpers accept synchronous or asynchronous callables just like the
synchronous API.
