In [1]:
import nest_asyncio
nest_asyncio.apply()

In [2]:
import pydra

@pydra.mark.task
def add_two(x):
    return x + 2

task1 = add_two(x=[1, 2, 3])

In [3]:
task1.state is None

True

In [4]:
task1.split("x")

<pydra.engine.task.FunctionTask at 0x7feea033b0d0>

In [5]:
task1.state

<pydra.engine.state.State at 0x7fee80562950>

In [6]:
print(task1.state)

State for add_two with a splitter: add_two.x and combiner: []


In [7]:
task1.state.splitter

'add_two.x'

In [8]:
task1()
task1.result()

[Result(output=Output(out=3), runtime=None, errored=False),
 Result(output=Output(out=4), runtime=None, errored=False),
 Result(output=Output(out=5), runtime=None, errored=False)]

In [9]:
task1.result(return_inputs=True)

[({'add_two.x': 1}, Result(output=Output(out=3), runtime=None, errored=False)),
 ({'add_two.x': 2}, Result(output=Output(out=4), runtime=None, errored=False)),
 ({'add_two.x': 3}, Result(output=Output(out=5), runtime=None, errored=False))]

In [10]:
task1.result(return_inputs="ind")

[({'add_two.x': 0}, Result(output=Output(out=3), runtime=None, errored=False)),
 ({'add_two.x': 1}, Result(output=Output(out=4), runtime=None, errored=False)),
 ({'add_two.x': 2}, Result(output=Output(out=5), runtime=None, errored=False))]

In [11]:
@pydra.mark.task
def add_var(a, b):
    return a + b

In [12]:
task2 = add_var(a=[1, 2, 3], b=10).split("a")
task2()
task2.result()

[Result(output=Output(out=11), runtime=None, errored=False),
 Result(output=Output(out=12), runtime=None, errored=False),
 Result(output=Output(out=13), runtime=None, errored=False)]

In [13]:
task3 = add_var(a=[1, 2], b=[10, 100])

In [14]:
task3.split(("a", "b"))
task3()
task3.result()

[Result(output=Output(out=11), runtime=None, errored=False),
 Result(output=Output(out=102), runtime=None, errored=False)]

In [15]:
task4 = add_var(a=[1, 2], b=[10, 100])
task4.split(["a", "b"])
task4()
task4.result()

[Result(output=Output(out=11), runtime=None, errored=False),
 Result(output=Output(out=101), runtime=None, errored=False),
 Result(output=Output(out=12), runtime=None, errored=False),
 Result(output=Output(out=102), runtime=None, errored=False)]

In [16]:
task4.split(("a", "b"))

Exception: splitter has been already set, if you want to overwrite it - use overwrite=True

In [17]:
@pydra.mark.task
def add_vector(x1, y1, x2, y2):
    return (x1 + x2, y1 + y2)

task5 = add_vector(name="add_vect", output_names=["x", "y"], 
                   x1=[10, 20], y1=[1, 2], x2=[10, 20, 30], y2=[10, 20, 30])
task5.split(splitter=[("x1", "y1"), ("x2", "y2")])
task5()
task5.result()

[Result(output=Output(out=(20, 11)), runtime=None, errored=False),
 Result(output=Output(out=(30, 21)), runtime=None, errored=False),
 Result(output=Output(out=(40, 31)), runtime=None, errored=False),
 Result(output=Output(out=(30, 12)), runtime=None, errored=False),
 Result(output=Output(out=(40, 22)), runtime=None, errored=False),
 Result(output=Output(out=(50, 32)), runtime=None, errored=False)]

In [18]:
task5 = add_var(a=[1, 2], b=[10, 100])
task5.split(["a", "b"])
# adding combiner
task5.combine("b")
task5()
task5.result()

[[Result(output=Output(out=11), runtime=None, errored=False),
  Result(output=Output(out=101), runtime=None, errored=False)],
 [Result(output=Output(out=12), runtime=None, errored=False),
  Result(output=Output(out=102), runtime=None, errored=False)]]

In [19]:
all_results = task5.result(return_inputs=True)
print(f"first list, a=1: {all_results[0]}")
print(f"\n second list, a=2: {all_results[1]}")

first list, a=1: [({'add_var.a': 1, 'add_var.b': 10}, Result(output=Output(out=11), runtime=None, errored=False)), ({'add_var.a': 1, 'add_var.b': 100}, Result(output=Output(out=101), runtime=None, errored=False))]

 second list, a=2: [({'add_var.a': 2, 'add_var.b': 10}, Result(output=Output(out=12), runtime=None, errored=False)), ({'add_var.a': 2, 'add_var.b': 100}, Result(output=Output(out=102), runtime=None, errored=False))]


In [20]:
task6 = add_var(a=[1, 2], b=[10, 100])
task6.split(["a", "b"])
# changing the combiner
task6.combine("a")
task6()
task6.result()

[[Result(output=Output(out=11), runtime=None, errored=False),
  Result(output=Output(out=12), runtime=None, errored=False)],
 [Result(output=Output(out=101), runtime=None, errored=False),
  Result(output=Output(out=102), runtime=None, errored=False)]]

In [21]:
all_results = task6.result(return_inputs=True)
print(f"first list, b=10: {all_results[0]}")
print(f"\n second list, b=100: {all_results[1]}")

first list, b=10: [({'add_var.a': 1, 'add_var.b': 10}, Result(output=Output(out=11), runtime=None, errored=False)), ({'add_var.a': 2, 'add_var.b': 10}, Result(output=Output(out=12), runtime=None, errored=False))]

 second list, b=100: [({'add_var.a': 1, 'add_var.b': 100}, Result(output=Output(out=101), runtime=None, errored=False)), ({'add_var.a': 2, 'add_var.b': 100}, Result(output=Output(out=102), runtime=None, errored=False))]


In [22]:
task7 = add_var(a=[1, 2], b=[10, 100])
task7.split(["a", "b"])
# combining all inputs
task7.combine(["a", "b"])
task7()
task7.result()

[Result(output=Output(out=11), runtime=None, errored=False),
 Result(output=Output(out=101), runtime=None, errored=False),
 Result(output=Output(out=12), runtime=None, errored=False),
 Result(output=Output(out=102), runtime=None, errored=False)]

In [23]:
@pydra.mark.task
def moment(lst, n):
    return sum([i ** n for i in lst]) / len(lst)

task8 = moment(n=3, lst=[2, 3, 4])

task8()
task8.result()

Result(output=Output(out=33.0), runtime=None, errored=False)

In [24]:
@pydra.mark.task
def power(x, n):
    return x**n

In [25]:
task_ex1 = power(x=[2, 3, 4, 5], n=[2, 3]).split(["x", "n"]).combine("x")
task_ex1()
task_ex1.result()

[[Result(output=Output(out=4), runtime=None, errored=False),
  Result(output=Output(out=9), runtime=None, errored=False),
  Result(output=Output(out=16), runtime=None, errored=False),
  Result(output=Output(out=25), runtime=None, errored=False)],
 [Result(output=Output(out=8), runtime=None, errored=False),
  Result(output=Output(out=27), runtime=None, errored=False),
  Result(output=Output(out=64), runtime=None, errored=False),
  Result(output=Output(out=125), runtime=None, errored=False)]]

In [26]:
squares_list = [el.output.out for el in task_ex1.result()[0]]
cubes_list = [el.output.out for el in task_ex1.result()[1]]
print(f"squares: {squares_list}")
print(f"cubes: {cubes_list}")

squares: [4, 9, 16, 25]
cubes: [8, 27, 64, 125]


In [27]:
import time

@pydra.mark.task
def add_two_sleep(x):
    time.sleep(1)
    return x + 2

task9 = add_two_sleep(x=[1, 2, 3, 4]).split("x")
t0 = time.time()
task9()
print(f'total time: {time.time() - t0}')
task9.result()

total time: 1.3276221752166748


[Result(output=Output(out=3), runtime=None, errored=False),
 Result(output=Output(out=4), runtime=None, errored=False),
 Result(output=Output(out=5), runtime=None, errored=False),
 Result(output=Output(out=6), runtime=None, errored=False)]

In [28]:
task10 = add_two_sleep(x=[1, 2, 3, 4]).split("x")

t0 = time.time()
with pydra.Submitter(plugin="cf") as sub:
    task10(submitter=sub)
print(f'total time: {time.time() - t0}')
print(f"results: {task10.result()}")

total time: 1.3208189010620117
results: [Result(output=Output(out=3), runtime=None, errored=False), Result(output=Output(out=4), runtime=None, errored=False), Result(output=Output(out=5), runtime=None, errored=False), Result(output=Output(out=6), runtime=None, errored=False)]


In [29]:
task11 = add_two_sleep(x=[1, 2, 3, 4]).split("x")

t0 = time.time()
task11(plugin="cf")
print(f'total time: {time.time() - t0}')
print(f"results: {task11.result()}")

total time: 1.3092260360717773
results: [Result(output=Output(out=3), runtime=None, errored=False), Result(output=Output(out=4), runtime=None, errored=False), Result(output=Output(out=5), runtime=None, errored=False), Result(output=Output(out=6), runtime=None, errored=False)]


In [30]:
task12 = add_two_sleep(x=[1, 2, 3, 4]).split("x")

t0 = time.time()
with pydra.Submitter(plugin="cf") as sub:
    sub(runnable=task12)
print(f'total time: {time.time() - t0}')
print(f"results: {task12.result()}")

total time: 1.291790246963501
results: [Result(output=Output(out=3), runtime=None, errored=False), Result(output=Output(out=4), runtime=None, errored=False), Result(output=Output(out=5), runtime=None, errored=False), Result(output=Output(out=6), runtime=None, errored=False)]


In [31]:
task13 = add_two_sleep(x=[1, 2, 3, 4]).split("x")

t0 = time.time()
with pydra.Submitter(plugin="cf", n_procs=2) as sub:
    sub(runnable=task13)
print(f'total time: {time.time() - t0}')
print(f"results: {task13.result()}")

total time: 2.362989902496338
results: [Result(output=Output(out=3), runtime=None, errored=False), Result(output=Output(out=4), runtime=None, errored=False), Result(output=Output(out=5), runtime=None, errored=False), Result(output=Output(out=6), runtime=None, errored=False)]
