Skip to content

How to: batch processing

Evildoor edited this page Sep 13, 2020 · 2 revisions

Note: batch processing is still under development, which means that this guide cannot be applied yet and information in it may change significantly.

A simple example of batch processing

Two important components of a DKB transform stage are process() and main(). Consider a very simple stage with the following functions:

def process(stage, message):
    data = message.content()
    data['new_field'] = 'value'
    stage.output(JSONMessage(data))
    return True

def main(args):
    stage = ProcessorStage()
    stage.set_input_message_type(messageType.JSON)
    stage.set_output_message_type(messageType.JSON)

    stage.process = process

    stage.configure(args)

    exit_code = stage.run()

    if exit_code == 0:
        stage.stop()

    exit(exit_code)

What happens in this code? main() performs the bare minimum of operations necessary to construct a new pyDKB stage utilizing this process() and JSON messages, and process() adds a new field (or overwrites the existing one) with fixed value.

This stage is intended to process a single message at a time. What needs to be altered to make it support batch mode? First, process() is changed to work with list of messages at once:

def process(stage, messages):
    for message in messages:
        data = message.content()
        data['new_field'] = 'value'
        stage.output(JSONMessage(data))
    return True

Second, main() is updated to inform ProcessorStage that stage supports batch processing:

def main(args):
    stage = ProcessorStage()
    stage.set_input_message_type(messageType.JSON)
    stage.set_output_message_type(messageType.JSON)

    stage.set_batch_size(10)

    stage.process = process
    ...

10 is the size of a batch, and must be more than 1 (it is set to 1 by default, and such value is treated as "the stage does not support batch processing"). The optimal value for this parameter depends on the mechanics of a particular stage.

When to use batch processing

Due to its simplicity, the stage in this example does not really benefit from batch processing: if 100 messages need to be processed, then there isn’t much difference between cycling over 100 messages in ProcessorStage or over 10 batches in ProcessorStage and 10 messages per batch in process(). The second case may even be worse due to time and resources needed to construct batches, run additional cycles, etc. This, however, is normal since the idea here is to illustrate the batch processing on a simple example. Obviously, there isn’t much reason to add it to actual stages with logic as simple as this one.

Where should it be used then? In stages where a batch can be processed more efficiently than a bunch of messages one by one. It is almost always the case for stages accessing data storages that have specialized means of accepting multiple data identificators at once: sending one request per message puts a lot of unnecessary pressure on the storage (unless it is specialized to work with such requests). Prime example here is AMI (stage 095 interacts with it):

  • AMI team specifically asked us to use batch processing.
  • The difference in speed between sending one request with several dataset names and one request per dataset name can easily be seen (caveat: time to get response from AMI may sometimes vary unexpectedly, thus affecting this "easily seen").

Message order

DKB messages are, by design, independent - this means that stage is free to change the message order if there is something to gain from it. This also means that output messages can be (not)produced wherever it is convenient:

def process(stage, messages):
    batch = []
    for message in messages:
        data = message.content()
        if data['type'] == 'some very incorrect type':
            sys.stderr.write("This message is very wrong, so I'm discarding it.")
        elif data['type'] == 'some type that should be ignored by this stage':
            sys.stderr.write("I should ignore this message, so I'm passing it on as-is.")
            stage.output(JSONMessage(data))
        else:
            batch.append(data)
    do_something_with_batch(batch)
    for data in batch:
        stage.output(JSONMessage(data))
    return True