### Node

In [1]:
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline

In [2]:
def add(x,y):
    return x+y

In [3]:
add_node = node(func=add,inputs=['x','y'],outputs='sum',name='adding_x_and_y')
print(add_node)
add_node

adding_x_and_y: add([x,y]) -> [sum]


Node(add, ['x', 'y'], 'sum', 'adding_x_and_y')

### Pipeline

In [4]:
def mean(xs,n):
    return sum(xs)/n

In [5]:
def mean_sos(xs,n):
    return sum(x**2 for x in xs)/n

In [6]:
def variance(m,m2):
    return m2-m*m

In [7]:
variance_pipeline = pipeline(
    [
        node(len,'xs','n',name='length'),
        node(mean,['xs','n'],'m',name='mean_value'),
        node(mean_sos,['xs','n'],'m2',name='mean_square'),
        node(variance,['m','m2'],'v',name='variance')
    
    ]
)
print(variance_pipeline)

Pipeline([
Node(len, 'xs', 'n', 'length'),
Node(mean_sos, ['xs', 'n'], 'm2', 'mean_square'),
Node(mean, ['xs', 'n'], 'm', 'mean_value'),
Node(variance, ['m', 'm2'], 'v', 'variance')
])


In [8]:
print(variance_pipeline.describe())

#### Pipeline execution order ####
Inputs: xs

length
mean_square
mean_value
variance

Outputs: v
##################################


In [9]:
pipeline_de = pipeline([node(len,'xs','n'),node(mean,['xs','n'],'m')])
pipeline_ds = pipeline([node(mean_sos,['xs','n'],'m2'),node(variance,['m','m2'],'v')])
last_node   = node(print,'v',None)

consolidated_pipeline = pipeline([pipeline_de,pipeline_ds,last_node])
print(consolidated_pipeline.describe())

#### Pipeline execution order ####
Inputs: xs

len([xs]) -> [n]
mean([xs,n]) -> [m]
mean_sos([xs,n]) -> [m2]
variance([m,m2]) -> [v]
print([v]) -> None

Outputs: None
##################################


In [10]:
nodes=consolidated_pipeline.nodes
nodes

[Node(len, 'xs', 'n', None),
 Node(mean, ['xs', 'n'], 'm', None),
 Node(mean_sos, ['xs', 'n'], 'm2', None),
 Node(variance, ['m', 'm2'], 'v', None),
 Node(print, 'v', None, None)]

In [11]:
nodes[0].inputs

['xs']

In [12]:
nodes[0].outputs

['n']

In [13]:
try:
    pipeline(node(None,None,None))
except Exception as e:
    print(e)

Invalid Node definition: first argument must be a function, not 'NoneType'.
Format should be: node(function, inputs, outputs)


In [14]:
try:
    pipeline(node(sum,None,None))
except Exception as e:
    print(e)

Invalid Node definition: it must have some 'inputs' or 'outputs'.
Format should be: node(function, inputs, outputs)


In [15]:
try:
    pipeline(
    [
        node(lambda x : x+1,'x','y'),
        node(lambda y : y-1,'y','x')
    ])
except Exception as e:
    print(e)

Circular dependencies exist among these items: ['<lambda>([x]) -> [y]', '<lambda>([y]) -> [x]']


In [16]:
def defrost(x):
    return y
def grill(y):
    return z
def eat(a):
    return None

cook_pipeline = pipeline(
    [
        node(func=defrost,inputs='frozen_veg',outputs='veg'),
        node(func=grill,inputs='veg',outputs='grilled_veg')
    ])
meal_pipeline = pipeline([node(func=eat,inputs='food',outputs=None)])
#consolidated = cook_pipeline + meal_pipeline
#*************************combine broken pipelines****************
pre_pipeline = pipeline(pipe=cook_pipeline,outputs={'grilled_veg':'food'})
consolidated = pre_pipeline + meal_pipeline
print(consolidated.describe())

#### Pipeline execution order ####
Inputs: frozen_veg

defrost([frozen_veg]) -> [veg]
grill([veg]) -> [food]
eat([food]) -> None

Outputs: None
##################################


### End !!!