Skip to content

Messaging system

ccacciari edited this page Sep 12, 2017 · 5 revisions

In this context a messaging system is a software component which allows to publish messages on a queue and, later, to read them. This simple mechanism can be implemented using many different open source tools. We have tested our implementation against the ARGO system, which implements the Google's pubsub protocol in front of an Apache Kafka server.
With the term "metadata", we are referring to system metadata, such as checksum, path, owners, etc.
The same approach could be, in principle, applied to community specific metadata, but we have not tested it.

Messaging workflow

The messages are exchanged according to the following workflow:

The "control" queue is permanent, while the others are deleted once they are empty. The control queue publishes one message for each change event:

/cinecaDMPZone2/home/claudio/collection_A

The message contains just the collection's absolute path. Its MD5 hash is the name of the related queue, dedicated to all the changes of that collection. iRODS rules send the messages to the control queue, create the related temporary queues and publish the object's metadata on the temporary queues. The GraphDB client does the opposite: get the names of the collections affected by the updates from the control queue and then read the metadata from the temporary ones, deleting them once they are empty. This is a typical message with metadata:

{ /cinecaDMPZone2/home/claudio/collection_A:
  { objects:  [butterfly.txt=:=resource=:=cinecaRes1,
               butterfly.txt=:=owner=:=claudio,
               butterfly.txt=:=checksum=:=
              ] 
  } 
}

Publishing metadata

The B2SAFE core module includes some rules to push metadata to a messaging system. It supports two main scenarios:

  1. The data sets are periodically updated and the updates are distributed among few big collections and affect most part of the objects included in each collection
  2. The data sets are periodically updated and the updates are distributed among many collections and affect few objects per collection.

In case 1, it is better to schedule an asynchronous operation to read the metadata from B2SAFE and publish them on a queue. For this it is possible to use the rule EUDATPushMetadata(*path, *queue). In case 2, it is better to trigger a synchronous operation, relying on the rules EUDATPushCollMetadata(*path, *queue), EUDATPushObjMetadata(*path, *queue) and EUDATMessage(*queue, *message). For example:

For collections:

acPostProcForCollCreate() {
  writeLine("serverLog", "DEBUG:acPostProcForCollCreate");
  *queue = 'None';
  *message = $collName;
  *queueName = EUDATMessage(*queue, *message);
  EUDATPushCollMetadata($collName, *queueName);
}

For objects:

acPostProcForModifyDataObjMeta() {
  writeLine("serverLog", "DEBUG:acPostProcForModifyDataObjMeta");
  msiSplitPath($objPath, *parent, *child);
  *queue = 'None';
  *message = *parent;
  *queueName = EUDATMessage(*queue, *message);
  EUDATPushObjMetadata($objPath, *queueName);
}

The rule EUDATMessage relies on the configuration of the messaging system client script. The ARGO client in this case. It is necessary to:

  • define the following link in the iRODS_HOME/EXECUTABLES_DIR: messageManager.py -> /opt/eudat/b2safe/cmd/argo.py,
  • fill in the configuration parameters in /opt/eudat/b2safe/conf/argo.conf,
  • set the desired values in /opt/eudat/b2safe/rulebase/local.re, in the rule getMessageParameters(*msgConfPath, *controlQueueName, *enabled).

Reading metadata

The script cmd/b2safe_neo4j_client.py has an ARGO client, embedded in the code, which relies on the input parameters defined in the file conf/b2safe_neo4j.conf. The script reads the control queue, which publishes one message for each collection modified inside the B2SAFE storage. And using the information from the control queue, it is able to get the metadata for each object affected by updates.

Clone this wiki locally