-
Notifications
You must be signed in to change notification settings - Fork 0
Messaging system
In this context a messaging system is a software component which allows to publish messages on a queue and, later, to read them. This simple mechanism can be implemented using many different open source tools. We have tested our implementation against the ARGO system, which implements the Google's pubsub protocol in front of an Apache Kafka server.
With the term "metadata", we are referring to system metadata, such as checksum, path, owners, etc.
The same approach could be, in principle, applied to community specific metadata, but we have not tested it.
The B2SAFE core module includes some rules to push metadata to a messaging system. It supports two main scenarios:
- The data sets are periodically updated and the updates are distributed among few big collections and affect most part of the objects included in each collection
- The data sets are periodically updated and the updates are distributed among many collections and affect few objects per collection.
In case 1, it is better to schedule an asynchronous operation to read the metadata from B2SAFE and publish them on a queue. For this it is possible to use the rule EUDATPushMetadata(*path, *queue). In case 2, it is better to trigger a synchronous operation, relying on the rules EUDATPushCollMetadata(*path, *queue), EUDATPushObjMetadata(*path, *queue) and EUDATMessage(*queue, *message). For example:
For collections:
acPostProcForCollCreate() {
writeLine("serverLog", "DEBUG:acPostProcForCollCreate");
*queue = 'None';
*message = $collName;
*queueName = EUDATMessage(*queue, *message);
EUDATPushCollMetadata($collName, *queueName);
}
For objects:
acPostProcForModifyDataObjMeta() {
writeLine("serverLog", "DEBUG:acPostProcForModifyDataObjMeta");
msiSplitPath($objPath, *parent, *child);
*queue = 'None';
*message = *parent;
*queueName = EUDATMessage(*queue, *message);
EUDATPushObjMetadata($objPath, *queueName);
}
The rule EUDATMessage relies on the configuration of the messaging system client script. The ARGO client in this case. It is necessary to:
- define the following link in the iRODS_HOME/EXECUTABLES_DIR: messageManager.py -> /opt/eudat/b2safe/cmd/argo.py,
- fill in the configuration parameters in /opt/eudat/b2safe/conf/argo.conf,
- set the desired values in /opt/eudat/b2safe/rulebase/local.re, in the rule g_etMessageParameters(*msgConfPath, *controlQueueName, *enabled)_.
The script cmd/b2safe_neo4j_client.py has an ARGO client, embedded in the code, which relies on the input parameters defined in the file conf/b2safe_neo4j.conf. The script reads a queue, called "control", which publishes one message for each collection modified inside the B2SAFE storage. Each message include a pointer to another queue, dedicated to all the changes of a that collection. The control queue is persistent, while the collection specific ones are deleted once all the messages have been retrieved.