### Sharing Task Input Data

RP supports the concurrent execution of many tasks and, often, these tasks share some or all their input data, i.e., files. We have seen earlier that input staging can incur a significant runtime overhead. Such an overhead can be significantly reduced by avoiding redundant file staging operations.

Each RP pilot manages a shared data space where to store tasks’ input files. First, RP can stage input files into the shared data space of a pilot. Second, that pilot can create symbolic links (symlinks) in the work directory of each task to any file in the shared data space. In this way, set of tasks can access the same file, avoiding costly staging and replicating operations.

Stage shared data from `pwd` to the pilot's shared data space

```python
pilot.stage_in({'source': 'file://%s/input.dat' % os.getcwd(),
                'target': 'staging:///input.dat',
                'action': rp.TRANSFER})
```

Create a symlink in the work directory of each task to the file <i>input.dat</i>

```python 
for i in range(0, n):
    cud = rp.TaskDescription()

    cud.executable     = '/usr/bin/wc'
    cud.arguments      = ['-c', 'input.dat']
    cud.input_staging  = {'source': 'staging:///input.dat',
                          'target': 'input.dat',
                          'action': rp.LINK}
```

The rp.LINK staging action creates a symlink, avoiding the copy operation used by the rp.TRANSFER action.

<b>Note:</b> Unlike other methods in RP, the pilot.stage_in method is synchronous, i.e., it only returns once the transfer is completed. This may change in a future version of RP.

<h2>Running the Example</h2>

The output of the below example is the same as <b>section 3.6</b>, but the script should run significantly faster due to the removed staging redundancy, especially for non-local pilots.

In [2]:
from dotenv import load_dotenv
load_dotenv()


True

We start by importing the radical.pilot module and initializing the reporter facility used for printing well formatted runtime and progress information.

In [3]:
import os
import sys

verbose  = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = verbose

import radical.pilot as rp
import radical.utils as ru

report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)

[94m[1m
[39m[0m[94m[1m Getting Started (RP version 1.12.0)                                            
[39m[0m[94m[1m
[39m[0m

We will now import the dotenv module for fetching our environment variables. To create a new Session, you need to provide the URL of a MongoDB server which we will fetch from our .env file.

We will set the resource value to 'local.localhost'. Using a resource key other than local.localhost implicitly tells RADICAL-Pilot that it is targeting a remote resource.

In [4]:
RADICAL_PILOT_DBURL = os.getenv("RADICAL_PILOT_DBURL")
os.environ['RADICAL_PILOT_DBURL'] = RADICAL_PILOT_DBURL
resource = 'local.localhost'
session = rp.Session()

[94mnew session: [39m[0m[rp.session.radical.3.anindita.019100.0000][39m[0m[94m                       \
database   : [39m[0m[mongodb://anindita:****@95.217.193.116:27017/anindita][39m[0m

RuntimeError: bridge startup failed

In the <i>create_pilot_description</i> function, we create a dictionary to initialize the pilot description object.

In [None]:
def create_pilot_description(resources):
    report.info('read config')
    config = ru.read_json('../config.json')
    report.ok('>>ok\n')

    report.header('submit pilots')

    pmgr = rp.PilotManager(session=session)

    pd_init = {
               'resource'      : resource,
               'runtime'       : 15,
               'exit_on_error' : True,
               'project'       : config[resource].get('project', None),
               'queue'         : config[resource].get('queue', None),
               'access_schema' : config[resource].get('schema', None),
               'cores'         : config[resource].get('cores', 1),
               'gpus'          : config[resource].get('gpus', 0),
              }
    pdesc = rp.PilotDescription(pd_init)
    return pdesc


<i>launch_pilots</i> adds a PilotManager for managing one or more pilots.

In [None]:
def launch_pilots(session,pdesc):
    pmgr = rp.PilotManager(session=session)
    pilots = pmgr.submit_pilots(pdesc)
    return pilots    

Create a workload of char-counting a simple file.  We first create the file right here, and stage it to the pilot 'shared_data' space. We then synchronously stage the data to the pilot


In [None]:
def create_and_stage_input_data():
    os.system('hostname >  input.dat')
    os.system('date     >> input.dat')

    report.info('stage in shared data')
    pilot.stage_in({'source': 'client:///input.dat',
                    'target': 'pilot:///input.dat',
                    'action': rp.TRANSFER})
    report.ok('>>ok\n')


In the <i>submit_tasks</i> function, we first register the pilot in a TaskManager object. 

After this we initialize the number of tasks(n=128) and create a new Task description. We submit the previously created Task descriptions to the PilotManager. This will trigger the selected scheduler to start assigning Tasks to the Pilots.

We will use ```tmgr.wait_tasks()```to wait for all tasks to reach a final (DONE, CANCELED or FAILED).

In [None]:
def submit_tasks(pilots):
    report.header('submit tasks')

    tmgr = rp.TaskManager(session=session)
    tmgr.add_pilots(pilot)

    n = 128  
    report.info('create %d task description(s)\n\t' % n)

    tds = list()
    outs = list()
    for i in range(0, n):
        td = rp.TaskDescription()
        td.executable     = '/bin/cat'
        td.arguments      = ['input.dat']
        td.stdout         = 'STDOUT'
        td.input_staging  = {'source': 'pilot:///input.dat',
                             'target': 'task:///input.dat',
                             'action': rp.LINK}
        td.output_staging = {'source': 'task:///STDOUT',
                             'target': 'pilot:///STDOUT.%06d' % i,
                             'action': rp.COPY}
        outs.append('STDOUT.%06d' % i)
        tds.append(td)
        report.progress()
    report.ok('>>ok\n')

    tasks = tmgr.submit_tasks(tds)
    report.header('gather results')
    tmgr.wait_tasks()
    return tasks
    

We create the <i>report_task_progress</i> function to report the task status of each task

In [1]:
def report_task_progress(tasks):
    report.info('\n')
    for task in tasks:
        report.plain('  * %s: %s, exit: %3s, out: %s\n'
                % (task.uid, task.state[:4],
                    task.exit_code, task.stdout.strip()[:35]))

    os.system('rm input.dat')
    
    report.info('stage out shared data')
    pilot.stage_out([{'source': 'pilot:///%s'  % fname,
                      'target': 'client:///%s' % fname,
                      'action': rp.TRANSFER} for fname in outs])
    report.ok('>>ok\n')

We put all function calls inside a try except block.  Finally, always clean up the session no matter if we caught an exception or not. This will kill all the remaining pilots.

In [None]:
try:
    pdesc = create_pilot_description(resources)
    pilots = launch_pilots(session,pdesc)
    create_and_stage_input_data()
    tasks = submit_tasks(pilots)
    report_task_progress(tasks)
except Exception as e:
    report.error('caught Exception: %s\n' % e)
    raise
except (KeyboardInterrupt, SystemExit):
    report.warn('exit requested\n')
finally:
    report.header('finalize')
    session.close()
report.header()