Skip to content

Commit

Permalink
Merge pull request #407 from jklaise/get_meta
Browse files Browse the repository at this point in the history
Add ability to fetch metadata from model and transformer components
  • Loading branch information
ukclivecox committed Jan 25, 2019
2 parents 224e22c + 2e7888f commit 714fa98
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 41 deletions.
30 changes: 23 additions & 7 deletions python/seldon_core/microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
DEBUG = False

ANNOTATIONS_FILE = "/etc/podinfo/annotations"
ANNOTATION_GRPC_MAX_MSG_SIZE = 'seldon.io/grpc-max-message-size'
ANNOTATION_GRPC_MAX_MSG_SIZE = 'seldon.io/grpc-max-message-size'

def startServers(target1, target2):
p2 = mp.Process(target=target2)
Expand Down Expand Up @@ -119,6 +119,13 @@ def get_data_from_json(message):
"Can't find data in json: " + strJson)


def get_meta_from_json(message):
if "meta" in message:
return message.get("meta")
else:
return {}


def rest_datadef_to_array(datadef):
if datadef.get("tensor") is not None:
features = np.array(datadef.get("tensor").get("values")).reshape(
Expand Down Expand Up @@ -167,6 +174,11 @@ def get_data_from_proto(request):
raise SeldonMicroserviceException("Unknown data in SeldonMessage")


def get_meta_from_proto(request):
meta = json_format.MessageToDict(request.meta)
return meta


def grpc_datadef_to_array(datadef):
data_type = datadef.WhichOneof("data_oneof")
if data_type == "tensor":
Expand Down Expand Up @@ -272,7 +284,7 @@ def main():
default=0, const=1, type=int)
parser.add_argument("--parameters", type=str,
default=os.environ.get(PARAMETERS_ENV_NAME, "[]"))
parser.add_argument("--log-level", type=str, default='INFO')
parser.add_argument("--log-level", type=str, default="INFO")
parser.add_argument("--tracing", nargs='?',
default=int(os.environ.get("TRACING", "0")), const=1, type=int)

Expand All @@ -285,6 +297,7 @@ def main():
if not isinstance(log_level_num, int):
raise ValueError('Invalid log level: %s', args.log_level)
logger.setLevel(log_level_num)
logger.debug("Log level set to %s:%s", args.log_level, log_level_num)

DEBUG = False
if parameters.get(DEBUG_PARAMETER):
Expand Down Expand Up @@ -315,6 +328,9 @@ def main():
elif args.service_type == "OUTLIER_DETECTOR":
import seldon_core.outlier_detector_microservice as seldon_microservice

# set log level for the imported microservice type
seldon_microservice.logger.setLevel(log_level_num)

port = int(os.environ.get(SERVICE_PORT_ENV_NAME, DEFAULT_PORT))

if args.tracing:
Expand Down Expand Up @@ -349,13 +365,13 @@ def main():
config = Config(
config=config_dict,
service_name=args.interface_name,
validate=True,
validate=True,
)
# this call also sets opentracing.tracer
tracer = config.initialize_tracer()



if args.api_type == "REST":

def rest_prediction_server():
Expand All @@ -365,7 +381,7 @@ def rest_prediction_server():
if args.tracing:
from flask_opentracing import FlaskTracer
tracing = FlaskTracer(tracer,True, app)

app.run(host='0.0.0.0', port=port)

logger.info("REST microservice running on port %i",port)
Expand All @@ -380,10 +396,10 @@ def grpc_prediction_server():
interceptor = open_tracing_server_interceptor(tracer)
else:
interceptor = None

server = seldon_microservice.get_grpc_server(
user_object, debug=DEBUG, annotations=annotations, trace_interceptor=interceptor)

server.add_insecure_port("0.0.0.0:{}".format(port))

server.start()
Expand Down
20 changes: 13 additions & 7 deletions python/seldon_core/model_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from seldon_core.proto import prediction_pb2, prediction_pb2_grpc
from seldon_core.microservice import extract_message, sanity_check_request, rest_datadef_to_array, \
array_to_rest_datadef, grpc_datadef_to_array, array_to_grpc_datadef, \
SeldonMicroserviceException, get_custom_tags, get_data_from_json, get_data_from_proto, ANNOTATION_GRPC_MAX_MSG_SIZE
SeldonMicroserviceException, get_custom_tags, get_data_from_json, get_data_from_proto, \
get_meta_from_json, get_meta_from_proto, ANNOTATION_GRPC_MAX_MSG_SIZE
from seldon_core.metrics import get_custom_metrics
from seldon_core.seldon_flatbuffers import SeldonRPCToNumpyArray, NumpyArrayToSeldonRPC, CreateErrorMsg

Expand All @@ -29,8 +30,11 @@
# ---------------------------


def predict(user_model, features, feature_names):
return user_model.predict(features, feature_names)
def predict(user_model, features, feature_names, **kwargs):
try:
return user_model.predict(features, feature_names, **kwargs)
except TypeError:
return user_model.predict(features, feature_names)


def send_feedback(user_model, features, feature_names, reward, truth):
Expand Down Expand Up @@ -77,8 +81,9 @@ def Predict():
else:
features = get_data_from_json(request)
names = request.get("data", {}).get("names")
meta = get_meta_from_json(request)

predictions = predict(user_model, features, names)
predictions = predict(user_model, features, names, meta=meta)
logger.debug("Predictions: %s", predictions)

# If predictions is an numpy array or we used the default data then return as numpy array
Expand Down Expand Up @@ -106,7 +111,7 @@ def Predict():
def SendFeedback():
feedback = extract_message()
logger.debug("Feedback received: %s", feedback)

if hasattr(user_model, "send_feedback_rest"):
return jsonify(user_model.send_feedback_rest(feedback))
else:
Expand Down Expand Up @@ -138,9 +143,10 @@ def Predict(self, request, context):
return self.user_model.predict_grpc(request)
else:
features = get_data_from_proto(request)
meta = get_meta_from_proto(request)
datadef = request.data
data_type = request.WhichOneof("data_oneof")
predictions = predict(self.user_model, features, datadef.names)
predictions = predict(self.user_model, features, datadef.names, meta=meta)

# Construct meta data
meta = prediction_pb2.Meta()
Expand Down Expand Up @@ -204,7 +210,7 @@ def get_grpc_server(user_model, debug=False, annotations={}, trace_interceptor=N
if trace_interceptor:
from grpc_opentracing.grpcext import intercept_server
server = intercept_server(server, trace_interceptor)

prediction_pb2_grpc.add_ModelServicer_to_server(seldon_model, server)

return server
Expand Down
34 changes: 23 additions & 11 deletions python/seldon_core/transformer_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from seldon_core.proto import prediction_pb2, prediction_pb2_grpc
from seldon_core.microservice import extract_message, sanity_check_request, rest_datadef_to_array, \
array_to_rest_datadef, grpc_datadef_to_array, array_to_grpc_datadef, \
SeldonMicroserviceException, get_custom_tags, get_data_from_json, get_data_from_proto, ANNOTATION_GRPC_MAX_MSG_SIZE
SeldonMicroserviceException, get_custom_tags, get_data_from_json, get_data_from_proto, \
get_meta_from_json, get_meta_from_proto, ANNOTATION_GRPC_MAX_MSG_SIZE
from seldon_core.metrics import get_custom_metrics

logger = logging.getLogger(__name__)
Expand All @@ -20,16 +21,22 @@
# ---------------------------


def transform_input(user_model, features, feature_names):
def transform_input(user_model, features, feature_names, **kwargs):
if hasattr(user_model, "transform_input"):
return user_model.transform_input(features, feature_names)
try:
return user_model.transform_input(features, feature_names, **kwargs)
except TypeError:
return user_model.transform_input(features, feature_names)
else:
return features


def transform_output(user_model, features, feature_names):
def transform_output(user_model, features, feature_names, **kwargs):
if hasattr(user_model, "transform_output"):
return user_model.transform_output(features, feature_names)
try:
return user_model.transform_output(features, feature_names, **kwargs)
except TypeError:
return user_model.transform_output(features, feature_names)
else:
return features

Expand Down Expand Up @@ -80,8 +87,9 @@ def TransformInput():
else:
features = get_data_from_json(request)
names = request.get("data", {}).get("names")
meta = get_meta_from_json(request)

transformed = transform_input(user_model, features, names)
transformed = transform_input(user_model, features, names, meta=meta)
logger.debug("Transformed: %s", transformed)

# If predictions is an numpy array or we used the default data then return as numpy array
Expand Down Expand Up @@ -114,8 +122,9 @@ def TransformOutput():
else:
features = get_data_from_json(request)
names = request.get("data", {}).get("names")
meta = get_meta_from_json(request)

transformed = transform_output(user_model, features, names)
transformed = transform_output(user_model, features, names, meta=meta)
logger.debug("Transformed: %s", transformed)

if isinstance(transformed, np.ndarray) or "data" in request:
Expand Down Expand Up @@ -150,10 +159,11 @@ def TransformInput(self, request, context):
return self.user_model.transform_input_grpc(request)
else:
features = get_data_from_proto(request)
meta = get_meta_from_proto(request)
datadef = request.data
data_type = request.WhichOneof("data_oneof")

transformed = transform_input(self.user_model, features, datadef.names)
transformed = transform_input(self.user_model, features, datadef.names, meta=meta)

# Construct meta data
meta = prediction_pb2.Meta()
Expand Down Expand Up @@ -184,9 +194,13 @@ def TransformOutput(self, request, context):
return self.user_model.transform_output_grpc(request)
else:
features = get_data_from_proto(request)
meta = get_meta_from_proto(request)
datadef = request.data
data_type = request.WhichOneof("data_oneof")

transformed = transform_output(
self.user_model, features, datadef.names, meta=meta)

# Construct meta data
meta = prediction_pb2.Meta()
metaJson = {}
Expand All @@ -198,8 +212,6 @@ def TransformOutput(self, request, context):
metaJson["metrics"] = metrics
json_format.ParseDict(metaJson, meta)

transformed = transform_output(
self.user_model, features, datadef.names)

if isinstance(transformed, np.ndarray) or data_type == "data":
transformed = np.array(transformed)
Expand Down Expand Up @@ -231,6 +243,6 @@ def get_grpc_server(user_model, debug=False, annotations={}, trace_interceptor=N
server = intercept_server(server, trace_interceptor)

prediction_pb2_grpc.add_TransformerServicer_to_server(seldon_model, server)
prediction_pb2_grpc.add_OutputTransformerServicer_to_server(seldon_model, server)
prediction_pb2_grpc.add_OutputTransformerServicer_to_server(seldon_model, server)

return server
4 changes: 2 additions & 2 deletions python/tests/test_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def start_microservice(app_location,tracing=False,grpc=False,envs={}):
p = Popen(cmd, cwd=app_location, env=env_vars, preexec_fn=os.setsid)

for q in range(10):
time.sleep(2)
time.sleep(5)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(("127.0.0.1", 5000))
if result == 0:
Expand Down Expand Up @@ -118,7 +118,7 @@ def test_model_template_app_grpc(tracing):
request = prediction_pb2.SeldonMessage(data=datadef)
feedback = prediction_pb2.Feedback(request=request,reward=1.0)
response = stub.SendFeedback(request=request)

def test_model_template_app_tracing_config():
envs = {"JAEGER_CONFIG_PATH":join(dirname(__file__), "tracing_config/tracing.yaml")}
with start_microservice(join(dirname(__file__), "model-template-app"),tracing=True,envs=envs):
Expand Down
Loading

0 comments on commit 714fa98

Please sign in to comment.