# Pipeline 

Model a simple pipeline

In [None]:
# Begin - startup boilerplate code

import pkgutil

if 'fibertree_bootstrap' not in [pkg.name for pkg in pkgutil.iter_modules()]:
  !python3 -m pip  install git+https://github.com/Fibertree-project/fibertree-bootstrap --quiet

# End - startup boilerplate code


from fibertree_bootstrap import *
fibertree_bootstrap()

In [None]:
s0 = Tensor.fromRandom(["X"], [10], [0.5], 10, seed=3, name="s0")
displayTensor(s0)
print(f"{s0:n*}")

## Simple 3-stage pipeline

Each stage processes (adds one to) the elements of a rank-1 tensor (one element per cycle) and passes the result to the next stage. There is a delay of **stage_delay** cycles between the generation of a result and its use in the next stage.

Note that the intermediate buffers between stages are represented as a full tensor, although the actually occupancy of the buffer at any cycle will be much less than the entire length of the tensor.

In [None]:
s1 = Tensor(rank_ids=["X"], name="s1")
s2 = Tensor(rank_ids=["X"], name="s2")
s3 = Tensor(rank_ids=["X"], name="s3")

stage_delay = 3

canvas = createCanvas(s0, s1, s2, s3, enable_wait=True)

s0_x = s0.getRoot()
s1_x = s1.getRoot()
s2_x = s2.getRoot()
s3_x = s3.getRoot()

cycle= 0

# Stage 1

for x0, (s1_ref, s0_val) in s1_x << s0_x:
    s1_ref <<= s0_val+1
    canvas.addActivity((x0,), (x0,), (), (), worker="PE0", skew=cycle)
    cycle += 1

# Stage 1

for x1, (s2_ref, s1_val) in s2_x << s1_x:
    s2_ref <<= s1_val+1
    canvas.addActivity((), (x1,), (x1,), (), worker="PE1", wait={"s1":stage_delay})

# Stage 3

for x2, (s3_ref, s2_val) in s3_x << s2_x:
    s3_ref <<= s2_val+1
    canvas.addActivity((), (), (x2,), (x2,), worker="PE2", wait={"s2":stage_delay})

        
displayCanvas(canvas)

## Two-stage parallel pipeline

Each stage processes (adds one to) the elements of a rank-1 tensor (#PEs elements per cycle) and passes the results to the next stage. The parallelism is represented by a splitting of the input tensor via **splitEqual()** into the work for each cycle. There is a delay of **stage_delay** cycles between the generation of a result and its use in the next stage.

In [None]:
s1 = Tensor(rank_ids=["X.1", "X.0"], name="s1")
s2 = Tensor(rank_ids=["X.1", "X.0"], name="s2")
s3 = Tensor(rank_ids=["X"], name="s3")

NUM_PEs = 2
stage_delay = 2

s0_split = s0.splitEqual(NUM_PEs)

canvas = createCanvas(s0_split, s1, s2, enable_wait=True)

s0_split_x1 = s0_split.getRoot()
s1_x1 = s1.getRoot()
s2_x1 = s2.getRoot()
s3_x = s3.getRoot()

cycle = CycleManager()

# Stage 1

for x1, (s1_x0, s0_x0) in s1_x1 << s0_split_x1:
    cycle.startParallel()
    for pe, (x0, (s1_ref, s0_val)) in enumerate(s1_x0 << s0_x0):
        cycle.startWorker()
        s1_ref <<= s0_val+1
        canvas.addActivity((x1,x0), (x1,x0), worker=f"S1-PE{pe}", skew=cycle())
        cycle.finishWorker()
    cycle.finishParallel()

# Stage 1

for x1, (s2_x0, s1_x0) in s2_x1 << s1_x1:
    for pe, (x0, (s2_ref, s1_val)) in enumerate(s2_x0 << s1_x0):
        s2_ref <<= s1_val+1
        canvas.addActivity((), (x1,x0), (x1,x0), worker=f"S2-PE{pe}", wait={"s1":stage_delay})


        
displayCanvas(canvas)