# Basic examples
In this notebook we will show how to write a basic pipeline, in the **desipipe** framework. You need to have installed **desipipe** with:
```
python -m pip install git+https://github.com/cosmodesi/desipipe#egg=desipipe
```

## Task manager
Let's consider a simple example: the Monte-Carlo estimation of $\pi$.

In [1]:
import time

from desipipe import Queue, Environment, TaskManager, FileManager

# Let's instantiate a Queue, which records all tasks to be performed
# spawn=True means a manager process is spawned to distribute the tasks among workers
# spawn=False only updates the queue, but no other process to run the tasks is spawned
# That can be updated afterwards, with e.g. the command line (see below):
# desipipe spawn -q ./_tests/test --spawn
queue = Queue('test', base_dir='_tests', spawn=True)
# Pool of 4 workers
# Any environment variable can be passed to Environment: it will be set when running the tasks below
tm = TaskManager(queue, environ=Environment(), scheduler=dict(max_workers=4))

# We decorate the function (task) with tm.python_app
@tm.python_app
def fraction(seed=42, size=10000):
    import time
    import numpy as np
    time.sleep(5)  # wait 5 seconds, just to show jobs are indeed run in parallel
    x, y = np.random.uniform(-1, 1, size), np.random.uniform(-1, 1, size)
    return np.sum((x**2 + y**2) < 1.) * 1. / size

# Here we use another task manager, with only 1 worker
tm2 = tm.clone(scheduler=dict(max_workers=1))
@tm2.python_app  # the two lines above can be on the same line in Python >= 3.9
def average(fractions):
    import numpy as np
    return np.average(fractions) * 4.

# Let's add another task, to be run in a shell
@tm2.bash_app
def echo(avg):
    return ['echo', '-n', 'bash app says pi is ~ {:.4f}'.format(avg)]

t0 = time.time()
# The following line stacks all the tasks in the queue
fractions = [fraction(seed=i) for i in range(20)]
# fractions is a list of Future instances
# We can pass them to other tasks, which creates a dependency graph
avg = average(fractions)
ech = echo(avg)
# At this point jobs are submitted
print('Elapsed time: {:.4f}'.format(time.time() - t0))

Elapsed time: 0.8421


In [2]:
# result() returns the result of the function, which can take some time to complete
# in this case, ~ 20 tasks which take 5 seconds distributed over 4 processes: typically 25 seconds
print(ech.out())
print('pi is ~ {:.4f}'.format(avg.result()))
print('Elapsed time: {:.1f}'.format(time.time() - t0))

bash app says pi is ~ 3.1418
pi is ~ 3.1418
Elapsed time: 30.9


## Tips
If you re-execute the two above cells, the cached result is immediately returned.
If you modify e.g. ``fraction``, a new result (including ``average``) will be computed.
If you modify ``average``, only ``average`` will be computed again.
Note that one can incrementally build the script: previous tasks will not be rerun if they have not changed.

In [3]:
# To delete the queue
# queue.delete()

## Command line
We provide a number of command line instructions to interact with queues: list queues, tasks in a queue, pause or resume a queue.

### List queues

In [4]:
%%bash
desipipe queues -q './_tests/*'

[000000.00]  06-26 07:24  desipipe                  INFO     Matching queues:
[000000.01]  06-26 07:24  desipipe                  INFO     Queue(size=22, state=ACTIVE, fn=/home/adematti/Bureau/DESI/NERSC/cosmodesi/desipipe/nb/_tests/test.sqlite)
WAITING   : 0
PENDING   : 0
RUNNING   : 0
SUCCEEDED : 22
FAILED    : 0
KILLED    : 0
UNKNOWN   : 0


### List tasks in a queue

In [5]:
%%bash
desipipe tasks -q ./_tests/test
# task state can be:
# WAITING  Waiting for requirements (other tasks) to finish
# PENDING  Eligible to be selected and run
# RUNNING  Running right now
# SUCCEEDED  Finished with errno = 0
# FAILED  Finished with errno != 0

[000000.11]  06-26 07:24  desipipe                  INFO     Tasks that are SUCCEEDED:
[000000.11]  06-26 07:24  desipipe                  INFO     jobid: 
[000000.11]  06-26 07:24  desipipe                  INFO     errno: 0
[000000.11]  06-26 07:24  desipipe                  INFO     err: 
[000000.11]  06-26 07:24  desipipe                  INFO     out: 
[000000.11]  06-26 07:24  desipipe                  INFO     jobid: 
[000000.11]  06-26 07:24  desipipe                  INFO     errno: 0
[000000.11]  06-26 07:24  desipipe                  INFO     err: 
[000000.11]  06-26 07:24  desipipe                  INFO     out: 
[000000.11]  06-26 07:24  desipipe                  INFO     jobid: 
[000000.11]  06-26 07:24  desipipe                  INFO     errno: 0
[000000.11]  06-26 07:24  desipipe                  INFO     err: 
[000000.11]  06-26 07:24  desipipe                  INFO     out: 
[000000.11]  06-26 07:24  desipipe                  INFO     jobid: 
[000000.11]  06-26 07:24 

### Pause a queue
When pausing a queue, all processes running tasks from this queue will stop (after they finish their current task).

In [6]:
%%bash
desipipe pause -q ./_tests/test
desipipe queues -q './_tests/*'  # state is now PAUSED

[000000.00]  06-26 07:24  desipipe                  INFO     Pausing queue Queue(size=22, state=ACTIVE, fn=/home/adematti/Bureau/DESI/NERSC/cosmodesi/desipipe/nb/_tests/test.sqlite)
[000000.00]  06-26 07:24  desipipe                  INFO     Matching queues:
[000000.01]  06-26 07:24  desipipe                  INFO     Queue(size=22, state=PAUSED, fn=/home/adematti/Bureau/DESI/NERSC/cosmodesi/desipipe/nb/_tests/test.sqlite)
WAITING   : 0
PENDING   : 0
RUNNING   : 0
SUCCEEDED : 22
FAILED    : 0
KILLED    : 0
UNKNOWN   : 0


### Resume a queue
When resuming a queue, tasks can be processed.

In [7]:
%%bash
desipipe resume -q ./_tests/test  # pass --spawn to spawn a manager process that will distribute the tasks among workers
desipipe queues -q './_tests/*'  # state is now ACTIVE

[000000.00]  06-26 07:24  desipipe                  INFO     Resuming queue Queue(size=22, state=PAUSED, fn=/home/adematti/Bureau/DESI/NERSC/cosmodesi/desipipe/nb/_tests/test.sqlite)
[000000.00]  06-26 07:24  desipipe                  INFO     Matching queues:
[000000.01]  06-26 07:24  desipipe                  INFO     Queue(size=22, state=ACTIVE, fn=/home/adematti/Bureau/DESI/NERSC/cosmodesi/desipipe/nb/_tests/test.sqlite)
WAITING   : 0
PENDING   : 0
RUNNING   : 0
SUCCEEDED : 22
FAILED    : 0
KILLED    : 0
UNKNOWN   : 0


### Retry
Change task state to PENDING.

In [8]:
%%bash
desipipe retry -q ./_tests/test --state SUCCEEDED
desipipe queues -q './_tests/*'  # task state is now PENDING

[000000.00]  06-26 07:24  desipipe                  INFO     Matching queues:
[000000.01]  06-26 07:24  desipipe                  INFO     Queue(size=22, state=ACTIVE, fn=/home/adematti/Bureau/DESI/NERSC/cosmodesi/desipipe/nb/_tests/test.sqlite)
WAITING   : 0
PENDING   : 22
RUNNING   : 0
SUCCEEDED : 0
FAILED    : 0
KILLED    : 0
UNKNOWN   : 0


### Spawn a manager process
Spawn a manager process that will distribute the tasks among workers, using the scheduler and provider defined above.

In [9]:
%%bash
desipipe spawn -q ./_tests/test  # pass --spawn to spawn an independent process, and exit this one
desipipe queues -q './_tests/*'  # tasks have been reprocessed: SUCCEEDED

[000000.00]  06-26 07:25  desipipe                  INFO     Matching queues:
[000000.01]  06-26 07:25  desipipe                  INFO     Queue(size=22, state=ACTIVE, fn=/home/adematti/Bureau/DESI/NERSC/cosmodesi/desipipe/nb/_tests/test.sqlite)
WAITING   : 0
PENDING   : 0
RUNNING   : 3
SUCCEEDED : 19
FAILED    : 0
KILLED    : 0
UNKNOWN   : 0


### Delete queue(s)

In [10]:
%%bash
desipipe delete -q './_tests/*'  # pass --force to actually delete the queue

[000000.00]  06-26 07:25  desipipe                  INFO     I will delete these queues:
[000000.01]  06-26 07:25  desipipe                  INFO     Queue(size=22, state=ACTIVE, fn=/home/adematti/Bureau/DESI/NERSC/cosmodesi/desipipe/nb/_tests/test.sqlite)
WAITING   : 0
PENDING   : 0
RUNNING   : 0
SUCCEEDED : 22
FAILED    : 0
KILLED    : 0
UNKNOWN   : 0


## File manager
The file manager aimes at keeping track of files (of all kinds) produced in the processing.

In [11]:
%%file '_tests/files.yaml'

description: Some text file
id: my_input_file
filetype: text
path: ${SOMEDIR}/in_{option1}_{i:d}.txt
author: Chuck Norris
options:
  option1: ['a', 'b']
  i: range(0, 3, 1)

Writing _tests/files.yaml


In [12]:
fm = FileManager('_tests/files.yaml', environ=dict(SOMEDIR='_tests'))
# To select files
fm2 = fm.select(keywords='text file', option1=['a'])
# Iterate over files
for fi in fm2:
    fi = fi.get()
    print(fi)
    # Write text
    fi.write('hello world!')

File(filetype=text, path=_tests/in_a_0.txt, id=my_input_file, author=Chuck Norris, options={'option1': 'a', 'i': 0}, description=Some text file)
File(filetype=text, path=_tests/in_a_1.txt, id=my_input_file, author=Chuck Norris, options={'option1': 'a', 'i': 1}, description=Some text file)
File(filetype=text, path=_tests/in_a_2.txt, id=my_input_file, author=Chuck Norris, options={'option1': 'a', 'i': 2}, description=Some text file)
File(filetype=text, path=_tests/in_b_0.txt, id=my_input_file, author=Chuck Norris, options={'option1': 'b', 'i': 0}, description=Some text file)
File(filetype=text, path=_tests/in_b_1.txt, id=my_input_file, author=Chuck Norris, options={'option1': 'b', 'i': 1}, description=Some text file)
File(filetype=text, path=_tests/in_b_2.txt, id=my_input_file, author=Chuck Norris, options={'option1': 'b', 'i': 2}, description=Some text file)


In [13]:
# To add a new entry
fm.db.append(dict(description='added file', id='added_file', filetype='catalog', path='test.fits'))
# To delete an entry
del fm.db[-1]
# To add a cloned entry
fm.db.append(fm.db[0].clone(id='my_output_file', path='${SOMEDIR}/out_{option1}_{i:d}.txt'))
fm.db.write('_tests/files.yaml')
# Display new file data base
!cat '_tests/files.yaml'

author: Chuck Norris
description: Some text file
filetype: text
id: my_input_file
options:
  i: range(0, 3)
  option1: [a, b]
path: ${SOMEDIR}/in_{option1}_{i:d}.txt
---
author: Chuck Norris
description: Some text file
filetype: text
id: my_output_file
options:
  i: range(0, 3)
  option1: [a, b]
path: ${SOMEDIR}/out_{option1}_{i:d}.txt


In practice, we will just edit the *.yaml* file directly.

In [14]:
# Let's add a new task!
@tm.python_app
def copy(text_in, text_out):
    import numpy as np  # just to illustrate that the package version is tracked
    text = text_in.read()
    text += ' this is my first message'
    print('saving', text_out.rpath)
    text_out.write(text)

In [15]:
# Iterate over files
for fi in fm:
    copy(fi.get(id='my_input_file'), fi.get(id='my_output_file'))

# Let's spawn a new process, as the previous one has finished (there was no work anymore!)
from desipipe import spawn
spawn(queue)

In [16]:
!ls -a _tests/

.	   files.yaml  in_a_2.txt  in_b_2.txt	out_a_2.txt  out_b_2.txt
..	   in_a_0.txt  in_b_0.txt  out_a_0.txt	out_b_0.txt  test.sqlite
.desipipe  in_a_1.txt  in_b_1.txt  out_a_1.txt	out_b_1.txt


In [17]:
!cat _tests/out_a_0.txt

hello world! this is my first message

In [18]:
# This is where desippe processing information is saved
!ls -a _tests/.desipipe
print('\n*.py file is:')
!cat _tests/.desipipe/copy.py
print('\n*.versions file is:')
!cat _tests/.desipipe/copy.versions

.  ..  copy.py	copy.versions

*.py file is:
def copy(text_in, text_out):
    import numpy as np  # just to illustrate that the package version is tracked
    text = text_in.read()
    text += ' this is my first message'
    print('saving', text_out.rpath)
    text_out.write(text)

*.versions file is:
json=2.0.9
numpy=1.24.3
ctypes=1.1.0


In [19]:
# Delete queue
queue.delete()