-
Notifications
You must be signed in to change notification settings - Fork 1
How to Write a Service
This tutorial will teach you how to write your own service. We will implement a labelizer that receives documents and labels it with a topic (e.g. "I like tomatoes" might receive the labels "fruit" and "red").
In this tutorial step you will learn about:
To begin with, each service derives from BaseService
and needs to implement the on_message
method. The service must also be instantiated when the script is called.
from roxcomposer import base_service
class YourService(base_service.BaseService):
def __init__(self, params=None):
super().__init__(params)
def on_message(self, msg, msg_id, parameters=None):
new_msg = do_stuff_with(msg)
self.dispatch(new_msg)
if __name__ == '__main__':
kwargs = None
if len(sys.argv) > 1:
kwargs = json.loads(sys.argv[1])
service = YourService(kwargs)
service.listen()
The constructor's params
object is important to initialize the service with its proper parameters. It is implemented as a dictionary containing all necessary information to run the service. That's mainly port and IP address on which the service should be reached and can be extended with other settings, e.g. concerning logging. You can easily set these parameters using the GUI. See How to Add a Service for details.
With the on_message()
function, you are able to define what should happen to a received message before dispatching it to the following services in the pipeline. The msg
parameter is simply a string containing any information you like, whereas msg_id
contains its unique identifier.
You can also provide parameters
, which are specific to the pipeline. When you create your pipeline you might want to use a service in a specific manner, e.g. a service that stores data to an SQL database - you might want to store the data in different tables depending on the use-case. Then you can use parameters
, which is simply a list of strings, to pass the information where the data should be stored, e.g:
parameters=["table=test_table"]
.
Note that because the parameters are simple strings you can format them in any way, because you choose how to parse them in your service.
After calling listen
the service will listen on the provided network address and upon receiving a message the message's payload will be fed into the on_message
function.
Now, let's write a service labelling incoming messages. For this example, we assume a message to be a JSON-formatted string containing a document
key. Note that in general a message can have any structure you prefer.
As a first step, let's define the Labelizer and an essential parameter filepath
. This filepath
parameter indicates the file where our labels and their associated words are stored.
from roxcomposer import base_service
class Labelizer(base_service.BaseService):
def __init__(self, params=None):
self.filepath = "labeled_words.json"
super().__init__(params)
if 'filepath' in params:
self.filepath = params['filepath']
if __name__ == '__main__':
kwargs = None
if len(sys.argv) > 1:
kwargs = json.loads(sys.argv[1])
service = Labelizer(kwargs)
service.listen()
Of course we need to do something with the messages we receive. Here, we assume that the message has the following structure:
{
"document": "I like tomatoes"
}
The on_message
method might look like this:
def on_message(self, msg, msg_id, parameters=None):
label_data = self.get_label_data() # get the labels and their associated terms
dispatch_msg = msg
self.logger.info("received message: " + str(msg))
try:
dispatch_msg = json.loads(msg) # create a json object from the message string
document = dispatch_msg["document"] # read the document that should be labelled
dispatch_msg["labels"] = []
# check all labels, if any term matches: label the document
for item in label_data:
term = label_data[item]["term"]
label = label_data[item]["label"]
if term in document:
dispatch_msg["labels"].append(label)
dispatch_msg = json.dumps(dispatch_msg) # convert back to string
except json.decoder.JSONDecodeError:
self.logger.error("could not read incoming message as json: " + str(msg))
self.logger.info("dispatching message: " + str(dispatch_msg))
return self.dispatch(dispatch_msg)
Now, we have labelled the document with appropriate topics (hopefully) and have provided some logs to track the message status and document potential errors. You may choose not to catch exceptions - however, this will cause all pipelines that contain your service to become inactive in case the service encounters an exception and crashes. The ROXcomposer itself should stay unaffected, and you will see a log message telling you why your service crashed - in some cases, rendering all pipelines that contain this service inactive might be a desired outcome.
Here is the get_label_data
function that is used in on_message
:
def get_label_data(self):
try:
with open(self.filepath) as f:
label_data = json.load(f)
return label_data
except FileNotFoundError:
self.logger.error("could not find file {}".format(self.filepath))
As you can see this function uses a filepath to load the list of labels. The json file containing the labeled words might look like this:
{
"item1": {
"term": "tomato",
"label": "fruit"
},
"item2": {
"term": "cucumber",
"label": "vegetable"
},
"item3": {
"term": "tomato",
"label":"red"
}
}
To test the labelizer save this json file and provide its filepath as a filepath
parameter when you register the service on the ROXcomposer in the next step.
labelizer.py
:
import json
import sys
from roxcomposer import base_service
class Labelizer(base_service.BaseService):
def __init__(self, params=None):
self.filepath = "labeled_words.json"
super().__init__(params)
if 'filepath' in params:
self.filepath = params['filepath']
def on_message(self, msg, msg_id, parameters=None):
self.logger.info("received message: " + str(msg))
label_data = self.get_label_data() # get the labels and their associated terms
dispatch_msg = msg
try:
dispatch_msg = json.loads(msg) # create a json object from the message string
document = dispatch_msg["document"] # read the document that should be labeled
dispatch_msg["labels"] = []
# check all labels, if any term matches: label the document
for item in label_data:
term = label_data[item]["term"]
label = label_data[item]["label"]
if term in document:
dispatch_msg["labels"].append(label)
dispatch_msg = json.dumps(dispatch_msg) # convert back to string
except json.decoder.JSONDecodeError:
self.logger.error("could not read incoming message as json: " + str(msg))
self.logger.info("dispatching message: " + str(dispatch_msg))
return self.dispatch(dispatch_msg)
def get_label_data(self):
try:
with open(self.filepath) as f:
label_data = json.load(f)
return label_data
except FileNotFoundError:
self.logger.error("could not find file {}".format(self.filepath))
if __name__ == '__main__':
kwargs = None
if len(sys.argv) > 1:
kwargs = json.loads(sys.argv[1])
service = Labelizer(kwargs)
service.listen()
labeled_words.json
:
{
"item1": {
"term": "tomato",
"label": "fruit"
},
"item2": {
"term": "cucumber",
"label": "vegetable"
},
"item3": {
"term": "tomato",
"label":"red"
}
}