Skip to content

Commit

Permalink
Merge pull request #2653 from RafalSkolasinski/batch-feedback
Browse files Browse the repository at this point in the history
WIP: add feedback to batch processor
  • Loading branch information
axsaucedo committed Nov 18, 2020
2 parents 1c5513b + 707a78b commit 6d79282
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 27 deletions.
130 changes: 112 additions & 18 deletions python/seldon_core/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
CHOICES_GATEWAY_TYPE = ["ambassador", "istio", "seldon"]
CHOICES_TRANSPORT = ["rest", "grpc"]
CHOICES_PAYLOAD_TYPE = ["ndarray", "tensor", "tftensor"]
CHOICES_DATA_TYPE = ["data", "json", "str"]
CHOICES_METHOD = ["predict"]
CHOICES_DATA_TYPE = ["data", "json", "str", "raw"]
CHOICES_METHOD = ["predict", "feedback"]
CHOICES_LOG_LEVEL = {
"debug": logging.DEBUG,
"info": logging.INFO,
Expand Down Expand Up @@ -66,6 +66,11 @@ def start_multithreaded_batch_worker(
q_in = Queue(workers * 2)
q_out = Queue(workers * 2)

if method == "feedback" and data_type != "raw":
raise RuntimeError("Feedback method is supported only with raw data type.")
elif data_type == "raw" and method != "feedback":
raise RuntimeError("Raw input is currently only support for feedback method.")

sc = SeldonClient(
gateway=gateway_type,
transport=transport,
Expand All @@ -84,7 +89,7 @@ def start_multithreaded_batch_worker(
for _ in range(workers):
Thread(
target=_start_request_worker,
args=(q_in, q_out, data_type, sc, retries, batch_id),
args=(q_in, q_out, data_type, sc, method, retries, batch_id),
daemon=True,
).start()

Expand All @@ -108,7 +113,7 @@ def start_multithreaded_batch_worker(
t_out.join()

if benchmark:
logging.info(f"Elapsed time: {time.time() - start_time}")
logger.info(f"Elapsed time: {time.time() - start_time}")


def _start_input_file_worker(q_in: Queue, input_data_path: str) -> None:
Expand Down Expand Up @@ -159,15 +164,16 @@ def _start_output_file_worker(

counter += 1
if counter % 100 == 0:
logging.info(f"Processed instances: {counter}")
logging.info(f"Total processed instances: {counter}")
logger.info(f"Processed instances: {counter}")
logger.info(f"Total processed instances: {counter}")


def _start_request_worker(
q_in: Queue,
q_out: Queue,
data_type: str,
sc: SeldonClient,
method: str,
retries: int,
batch_id: str,
) -> None:
Expand All @@ -186,17 +192,36 @@ def _start_request_worker(
The json/str/data type to send the requests as
sc
An initialised Seldon Client configured to send the requests to
method:
Method to call: predict or feedback
retries
The number of attempts to try for each request
batch_id
The unique identifier for the batch which is passed to all requests
"""
while True:
batch_idx, batch_instance_id, input_raw = q_in.get()
str_output = _send_batch_predict(
batch_idx, batch_instance_id, input_raw, data_type, sc, retries, batch_id
)
# Mark task as done in the queue to add space for new tasks
if method == "predict":
str_output = _send_batch_predict(
batch_idx,
batch_instance_id,
input_raw,
data_type,
sc,
retries,
batch_id,
)
# Mark task as done in the queue to add space for new tasks
elif method == "feedback":
str_output = _send_batch_feedback(
batch_idx,
batch_instance_id,
input_raw,
data_type,
sc,
retries,
batch_id,
)
q_out.put(str_output)
q_in.task_done()

Expand All @@ -217,7 +242,6 @@ def _send_batch_predict(
traced back individually in the Seldon Request Logger context. Each request
will be attempted for the number of retries, and will return the string
serialised result.
Paramters
---
batch_idx
Expand All @@ -234,7 +258,6 @@ def _send_batch_predict(
The number of times to retry the request
batch_id
The unique identifier for the batch which is passed to all requests
Returns
---
A string serialised result of the response (or equivallent data with error info)
Expand All @@ -252,9 +275,7 @@ def _send_batch_predict(
predict_kwargs["headers"] = {"Seldon-Puid": batch_instance_id}
try:
data = json.loads(input_raw)
# TODO: Add functionality to send "raw" payload
if data_type == "data":
# TODO: Update client to avoid requiring a numpy array
data_np = np.array(data)
predict_kwargs["data"] = data_np
elif data_type == "str":
Expand All @@ -265,15 +286,88 @@ def _send_batch_predict(
str_output = None
for i in range(retries):
try:
# TODO: Add functionality for explainer
# as explainer currently doesn't support meta
# TODO: Optimize client to share session for requests
seldon_payload = sc.predict(**predict_kwargs)
assert seldon_payload.success
str_output = json.dumps(seldon_payload.response)
break
except (requests.exceptions.RequestException, AssertionError) as e:
print("Exception:", e, "retries:", retries)
logger.error("Exception:", e, "retries:", retries)
if i == (retries - 1):
raise

except Exception as e:
error_resp = {
"status": {"info": "FAILURE", "reason": str(e), "status": 1},
"meta": meta,
}
str_output = json.dumps(error_resp)

return str_output


def _send_batch_feedback(
batch_idx: int,
batch_instance_id: int,
input_raw: str,
data_type: str,
sc: SeldonClient,
retries: int,
batch_id: str,
) -> str:
"""
Send an request using the Seldon Client with feedback
Paramters
---
batch_idx
The enumerated index given to the batch datapoint in order of local dataset
batch_instance_id
The unique ID of the batch datapoint created with the python uuid function
input_raw
The raw input in string format to be loaded to the respective format
data_type
The data type to send which can be str, json and data
sc
The instance of SeldonClient to use to send the requests to the seldon model
retries
The number of times to retry the request
batch_id
The unique identifier for the batch which is passed to all requests
Returns
---
A string serialised result of the response (or equivallent data with error info)
"""

feedback_kwargs = {}
meta = {
"tags": {
"batch_id": batch_id,
"batch_instance_id": batch_instance_id,
"batch_index": batch_idx,
}
}
# Feedback Protos does not support meta - defined to include in file output only.
try:
data = json.loads(input_raw)
feedback_kwargs["raw_request"] = data

str_output = None
for i in range(retries):
try:
seldon_payload = sc.feedback(**feedback_kwargs)
assert seldon_payload.success

# Update Tags so we can track feedback intances in output file
tags = seldon_payload.response.get("meta", {}).get("tags", {})
tags.update(meta["tags"])
if "meta" not in seldon_payload.response:
seldon_payload.response["meta"] = {}
seldon_payload.response["meta"]["tags"] = tags
str_output = json.dumps(seldon_payload.response)
break
except (requests.exceptions.RequestException, AssertionError) as e:
logger.error("Exception:", e, "retries:", retries)
if i == (retries - 1):
raise

Expand Down
49 changes: 40 additions & 9 deletions python/seldon_core/seldon_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from seldon_core.utils import (
array_to_grpc_datadef,
seldon_message_to_json,
json_to_feedback,
json_to_seldon_message,
feedback_to_json,
seldon_messages_to_json,
Expand Down Expand Up @@ -403,6 +404,7 @@ def feedback(
self,
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
prediction_truth: prediction_pb2.SeldonMessage = None,
reward: float = 0,
gateway: str = None,
transport: str = None,
Expand All @@ -419,6 +421,7 @@ def feedback(
namespace: str = None,
gateway_prefix: str = None,
client_return_type: str = None,
raw_request: dict = None,
) -> SeldonClientFeedback:
"""
Expand Down Expand Up @@ -483,16 +486,25 @@ def feedback(
namespace=namespace,
gateway_prefix=gateway_prefix,
client_return_type=client_return_type,
raw_request=raw_request,
)
self._validate_args(**k)
if k["gateway"] == "ambassador" or k["gateway"] == "istio":
if k["transport"] == "rest":
return rest_feedback_gateway(
prediction_request, prediction_response, reward, **k
prediction_request,
prediction_response,
prediction_truth,
reward,
**k,
)
elif k["transport"] == "grpc":
return grpc_feedback_gateway(
prediction_request, prediction_response, reward, **k
prediction_request,
prediction_response,
prediction_truth,
reward,
**k,
)
else:
raise SeldonClientException("Unknown transport " + k["transport"])
Expand Down Expand Up @@ -2148,13 +2160,15 @@ def grpc_feedback_seldon_oauth(
def rest_feedback_gateway(
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
prediction_truth: prediction_pb2.SeldonMessage = None,
reward: float = 0,
deployment_name: str = "",
namespace: str = None,
gateway_endpoint: str = "localhost:8003",
headers: Dict = None,
gateway_prefix: str = None,
client_return_type: str = "proto",
raw_request: dict = None,
**kwargs,
) -> SeldonClientFeedback:
"""
Expand Down Expand Up @@ -2187,10 +2201,17 @@ def rest_feedback_gateway(
A Seldon Feedback Response
"""
request = prediction_pb2.Feedback(
request=prediction_request, response=prediction_response, reward=reward
)
payload = feedback_to_json(request)
if raw_request:
request = json_to_feedback(raw_request)
payload = raw_request
else:
request = prediction_pb2.Feedback(
request=prediction_request,
response=prediction_response,
reward=reward,
truth=prediction_truth,
)
payload = feedback_to_json(request)
if gateway_prefix is None:
if namespace is None:
response_raw = requests.post(
Expand Down Expand Up @@ -2248,6 +2269,7 @@ def rest_feedback_gateway(
def grpc_feedback_gateway(
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
prediction_truth: prediction_pb2.SeldonMessage = None,
reward: float = 0,
deployment_name: str = "",
namespace: str = None,
Expand All @@ -2256,6 +2278,7 @@ def grpc_feedback_gateway(
grpc_max_send_message_length: int = 4 * 1024 * 1024,
grpc_max_receive_message_length: int = 4 * 1024 * 1024,
client_return_type: str = "proto",
raw_request: dict = None,
**kwargs,
) -> SeldonClientFeedback:
"""
Expand Down Expand Up @@ -2288,9 +2311,17 @@ def grpc_feedback_gateway(
-------
"""
request = prediction_pb2.Feedback(
request=prediction_request, response=prediction_response, reward=reward
)
if isinstance(raw_request, prediction_pb2.Feedback):
request = raw_request
elif raw_request:
request = json_to_feedback(raw_request)
else:
request = prediction_pb2.Feedback(
request=prediction_request,
response=prediction_response,
reward=reward,
truth=prediction_truth,
)
channel = grpc.insecure_channel(
gateway_endpoint,
options=[
Expand Down

0 comments on commit 6d79282

Please sign in to comment.