Skip to content

Latest commit

 

History

History
459 lines (320 loc) · 18.7 KB

workers.rst

File metadata and controls

459 lines (320 loc) · 18.7 KB

Creating Nowcast Worker Modules

Nowcast workers are Python modules that can be imported from :pynowcast.workers. They are composed of some standard code to enable them to interface with the nowcast system messaging and logging framework, and one or more functions to execute their task in the nowcast system. Most of the standard code is centred around setup of a :py~nemo_nowcast.worker.NowcastWorker object and executing method calls on it. The worker object is an instance of the :pynemo_nowcast.worker.NowcastWorker class.

Skeleton Worker Example

Here is a skeleton example of a worker module showing the standard code. It is explained, line by line, below. Actual (and obviously, more complicated) worker modules can be found in:

  • BuiltinWorkers
  • gomssnowcast:GoMSS_NowcastSystemWorkers
  • salishseanowcast:SalishSeaNowcastSystemWorkers
"""NEMO Nowcast worker to ...

...
"""
import logging

from nemo_nowcast import NowcastWorker


NAME = 'worker_name'
logger = logging.getLogger(NAME)


def main():
    """Set up and run the worker.

    For command-line usage see:

    :command:`python -m nemo_nowcast.workers.worker_name --help`
    """
    worker = NowcastWorker(NAME, description=__doc__)
    worker.init_cli()
    worker.run(worker_func, success, failure)


def success(parsed_args):
    logger.info('success message')
    msg_type = 'success'
    return msg_type


def failure(parsed_args):
    logger.critical('failure message')
    msg_type = 'failure'
    return msg_type


def worker_func(parsed_args, config, tell_manager):
    ...
    return checklist


if __name__ == '__main__':
    main()

Lines 1 through 5 are the module's triple-quoted docstring. It will appear in auto-generated documentation of the module. For convenience we will also use the docstring as the description element of the worker's command-line help message, although that can easily be changed if you prefer to put more details in the docstring than you want to appear in the help text.

The minimum set of imports that a worker needs are:

import logging

from nemo_nowcast import NowcastWorker

The :pylogging module is a Python standard library module that provides the mechanism that we use to print output about the worker's progress and status to the log file or the screen, effectively developer-approved print statements on steroids :-) The :py~nemo_nowcast.worker.NowcastWorker class provides the interface to the nowcast framework.

Obviously you will need to import whatever other modules your worker needs for its task.

Next up, on lines 12 and 13, are 2 module level variables:

NAME = 'worker_name'
logger = logging.getLogger(NAME)

:pyNAME is used to identify the source of logging messages, and messages exchanged between the worker and the nowcast manager process.

:pylogger is our interface to the Python standard library logging framework and we give this module's instance the worker's name.

Python scoping rules make module level variables available for use in any functions in the module without passing them as arguments but assigning new values to them elsewhere in the module will surely mess things up.

The :pymain Function

The :pymain function is where our worker gets down to work. It is called when the worker is run from the command line by virtue of the

if __name__ == '__main__':
    main()

stanza at the end of the module.

The minimum possible :pymain function is shown in lines 14 to 23:

def main():
    """Set up and run the worker.

    For command-line usage see:

    :command:`python -m nemo_nowcast.workers.worker_name --help`
    """
    worker = NowcastWorker(NAME, description=__doc__)
    worker.init_cli()
    worker.run(worker_func, success, failure)

The :pymain function docstring will appear in auto-generated documentation of the module.

First, we create an instance of the :py~NEMO_Nowcast.worker.NowcastWorker class that we call, by convention, :pyworker. The :py~NEMO_Nowcast.worker.NowcastWorker constructor takes 2 arguments:

  • the :pyNAME that we defined as a module-level variable above
  • a :pydescription string that is used as the description element of the worker's command-line help message; here we use the worker's module docstring (that is automatically stored in the :py__doc__ module-level variable)

    The description part of the help message is the paragraph after the usage, for example:

    (nowcast)$ python -m nowcast.workers.download_weather --help
    usage: python -m nowcast.workers.download_weather
           [-h] [--debug] [--yesterday] config_file {18,00,12,06}
    
    Salish Sea NEMO nowcast weather model dataset download worker. Download the
    GRIB2 files from today's 00, 06, 12, or 18 EC GEM 2.5km HRDPS operational
    model forecast.
    
    ...

See the :pyNEMO_Nowcast.worker.NowcastWorker documentation for details of the :py~NEMO_Nowcast.worker.NowcastWorker object's contructor arguments, other attributes, and methods.

Next, we call the :pyinit_cli method on the worker to initialize the worker's command-line interface (CLI). The default worker command-line interface requires a nowcast config file name, and provides --debug, --help, and -h options. The worker's CLI can be extended with additional command-line arguments and/or options. Please see ExtendingTheCommandLineInterface for details.

Finally, we call the :pyrun method on the :pyworker to do the actual work. The :pyrun method takes 3 function names as arguments:

  • :pyworker_func is the name of the function that does the worker's job
  • :pysuccess is the name of the function to be called when the worker finishes successfully
  • :pyfailure is the name of the function to be called when the worker fails

All 3 functions must be defined in the worker module. Their required call signatures and return values are described below.

:pysuccess and :pyfailure Functions

The :pysuccess function is called when the worker successfully completes its task. It is used to generate the message that is sent to the nowcast manager process to indicate the worker's success so that the nowcast automation can proceed to the next appropriate worker(s). A minimal :pysuccess function is shown in lines 26 through 29:

def success(parsed_args):
    logger.info('success message')
    msg_type = 'success'
    return msg_type

The name of the function is :pysuccess by convention, but it could be anything provided that it is the 2nd argument passed to the :pyworker.run method.

The :pysuccess function must accept exactly 1 argument, named :pyparsed_args by convention. It is an :pyargparse.Namespace object that has the worker's command-line argument names and values as attributes. Even if your :pysuccess function does not use :pyparsed_args it must still be included in the function definition.

The :pysuccess function should send a message via :pylogger.info to the logging system that describes the worker's success.

The :pysuccess function must return a string that is a key registered for the worker in the MessageRegistryConfig section of the NowcastConfigFile. The returned key specifies the message type that is sent to the SystemManager process to indicate the worker's success.

Here is a more sophisticated example of a :pysuccess function from the GoMSS Nowcast package download_weather <gomssnowcast:DownloadWeatherWorker>

worker:

def success(parsed_args):
    logger.info(
        '{date} weather forecast file downloads complete'
        .format(date=parsed_args.forecast_date.format('YYYY-MM-DD')))
    msg_type = 'success'
    return msg_type

The :pyfailure function is very similar to the :pysuccess function except that it is called if the worker fails to complete its task. It is used to generate the message that is sent to the nowcast manager process to indicate the worker's failure so that appropriate notifications can be produced and/or remedial action(s) taken. A minimal :pyfailure function is shown on lines 32 through 35:

def failure(parsed_args):
    logger.critical('failure message')
    msg_type = 'failure'
    return msg_type

The name of the function is :pyfailure by convention, but it could be anything provided that it is the 3rd argument passed to the :pyworker.run method.

Like the :pysuccess function, the :pyfailure function must accept exactly 1 argument, named :pyparsed_args by convention. It is an :pyargparse.Namespace object that has the worker's command-line argument names and values as attributes. Even if your :pyfailure function does not use :pyparsed_args it must still be included in the function definition.

The :pyfailure function should send a message via :pylogger.critical to the logging system that describes the worker's failure.

The :pyfailure function must return a string that is a key registered for the worker in the MessageRegistryConfig section of the NowcastConfigFile. The returned key specifies the message type that is sent to the nowcast manager process to indicate the worker's failure.

Doing the Work

Lines 38 through 40 show the required call signature and return value for the function that is called to do the worker's task:

def worker_func(parsed_args, config, tell_manager):
    ...
    return checklist

The name of the function can be anything provided that it is the 1st argument passed to the :pyworker.run method. Ideally, the function name should be descriptive of the worker's task. If you can't think of anything else, you can use the name of the worker module.

The function must accept exactly 3 arguments:

  • The 1st argument is named :pyparsed_args by convention. It is an :pyargparse.Namespace object that has the worker's command-line argument names and values as attributes. Even if your function does not use :pyparsed_args it must still be included in the function definition.
  • The 2nd argument is named :pyconfig by convention. It is a :pynemo_nowcast.config.Config object that provides :pydict-like access to the nowcast system configuration loaded from the NowcastConfigFile. Even if your function does not use :pyconfig it must still be included in the function definition.
  • The 3rd argument is named :pytell_manager by convention. It is the worker's :pynemo_nowcast.worker.NowcastWorker.tell_manager method. That method provides a mechanism for the exchange of messages with the nowcast manager process. Few workers need to do that, so the :pytell_manager is often replaced by :py*args in the function signature:

    def worker_func(parsed_args, config, *args):

    Please see the SalishSeaNowcast package watch_NEMO <salishseanowcast:WatchNEMO-Worker> worker for examples of the use of :pytell_manager.

The function must return a Python :pydict, known as :pychecklist by convention. :pychecklist must contain at least 1 key/value pair that provides information about the worker's successful completion. :pychecklist is sent to the nowcast manager process as the payload of the worker's success message. A simple example of a :pychecklist from the GoMSS Nowcast package download_weather <gomssnowcast:DownloadWeatherWorker> worker is:

checklist = {
    '{date} forecast'
    .format(date=date=parsed_args.forecast_date.format('YYYY-MM-DD'))): True}

which indicates that the particular forecast download was successful. A more sophisticated :pychecklist such as the one produced by the SalishSeaNowcast package get_NeahBay_ssh <salishseanowcast:MakeSshFilesWorker> worker contains multiple keys and lists of filenames.

The function whose name is passed as the 1st argument to the :pyworker.run method can be a driver function that calls other functions in the worker module to subdivide the worker task into smaller, more readable, and more testable sections. By convention, such "2nd level" functions are marked as private by prefixing their names with the _ (underscore) character; e.g. :py_calc_date. This is in line with the Python language convention that functions and methods that start with an underscore should not be called outside the module in which they are defined.

The worker should send messages to the logging system that indicate its progress. Messages sent via :pylogger.info appear in the nowcast.log file. Info level logging should be used for "high level" progress messages, and preferably not used inside loops. Messages logged via :pylogger.debug can be used for more detailed logging. Those messages appear in the nowcast.debug.log file.

If a worker function encounters an expected error condition (a file download failure or timeout, for example) it should send a message to the logging system via :pylogger.critical and raise a :pynemo_nowcast.worker.WorkerError exception. Here is an example that handles an empty downloaded file in the SalishSeaNowcast package download_weather <salishseanowcast:DownloadWeatherWorker> worker:

if size == 0:
    logger.critical('Problem, 0 size file {}'.format(fileURL))
    raise WorkerError

This section has only outlined the basic code structure and conventions for writing nowcast workers. The best way to learn now to write a new worker is by studying the code in existing worker modules, for example:

  • BuiltinWorkers
  • gomssnowcast:GoMSS_NowcastSystemWorkers
  • salishseanowcast:SalishSeaNowcastSystemWorkers

Extending the Command Line Interface

Generic Arguments

If you need to add a command-line argument to a worker you can do so by calling the :pyworker.cli.add_argument method. Here is an example from the SalishSeaNowcast package get_NeahBay_ssh <salishseanowcast:WatchNEMO-Worker> worker:

def main():
    """For command-line usage see:

    :command:`python -m nowcast.workers.watch_NEMO --help`
    """
    worker = NowcastWorker(NAME, description=__doc__)
    worker.init_cli()
    worker.cli.add_argument("host_name", help="Name of the host to monitor the run on")
    worker.cli.add_argument(
        "run_type",
        choices={"nowcast", "nowcast-green", "nowcast-dev", "forecast", "forecast2"},
        help="""
        Type of run to monitor:
        'nowcast' means nowcast physics run,
        'nowcast-green' means nowcast green ocean run,
        'forecast' means updated forecast run,
        'forecast2' means preliminary forecast run,
        """,
    )
    worker.run(watch_NEMO, success, failure)

The :pyworker.cli.add_argument method is documented at :pynemo_nowcast.cli.CommandLineInterface.add_argument. It takes the same arguments as the Python standard library :pyargparse.ArgumentParser.add_argument method.

Note

The :pyworker.init_cli method initialized the worker's command-line interface to provide help messages, and handle the config_file argument, and the --debug option.

Date Options

The fairly common need to add a date option to a worker's CLI is simplified by the :pyworker.cli.add_date_option. Here is an example from the GoMSS Nowcast package download_weather <gomssnowcast:DownloadWeatherWorker> worker:

def main():
    """Set up and run the worker.

    For command-line usage see:

    :command:`python -m nowcast.workers.download_weather --help`
    """
    worker = NowcastWorker(NAME, description=__doc__)
    worker.init_cli()
    worker.cli.add_date_option(
        '--forecast-date', default=arrow.now().floor('day'),
        help='Date for which to download the weather forecast.')
    worker.run(download_weather, success, failure)

This adds a --forecast-date option to the CLI. It's default value is an Arrow object whose value is midnight on the current date. It will be available in the worker functions as :pyparsed_args.forecast_date. The help message for the option is:

Date for which to download the weather forecast. Use YYYY-MM-DD format. Defaults to {default}.

where {default} is the value of :pydefault passed into :pyworker.cli.add_date_option formatted as YYYY-MM-DD.

The :pyworker.cli.add_date_option method is documented at :pynemo_nowcast.cli.CommandLineInterface.add_date_option.

Note

The Arrow object produced by :pyworker.cli.add_date_option is timezone-aware and its timezone is set to UTC. That is typically fine when working with just the date. If you need to do time calculations in a worker you may need to set the correct timezone. That is typically done by calling the :pyto method on the Arrow object with 'local' as its argument; e.g. parsed_args.forecast_date.to('local').