Skip to content

Commit

Permalink
Always return from trigger even if read from output topic times out
Browse files Browse the repository at this point in the history
(cherry picked from commit d5b70ad)
  • Loading branch information
cdbartholomew authored and eolivelli committed May 7, 2021
1 parent c9e071c commit b56a6d8
Showing 1 changed file with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,7 @@ public String triggerFunction(final String tenant,
log.error("Function in trigger function is not ready @ /{}/{}/{}", tenant, namespace, functionName);
throw new RestException(Status.BAD_REQUEST, "Function in trigger function is not ready");
}

String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
Reader<byte[]> reader = null;
Producer<byte[]> producer = null;
Expand Down Expand Up @@ -986,25 +987,22 @@ public String triggerFunction(final String tenant,
if (reader == null) {
return null;
}
long curTime = System.currentTimeMillis();
long maxTime = curTime + 1000;
while (curTime < maxTime) {
Message msg = reader.readNext(10000, TimeUnit.MILLISECONDS);
if (msg == null)
break;
if (msg.getProperties().containsKey("__pfn_input_msg_id__")
&& msg.getProperties().containsKey("__pfn_input_topic__")) {
MessageId newMsgId = MessageId.fromByteArray(
Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__")));

if (msgId.equals(newMsgId)
&& msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get(inputTopicToWrite).toString())) {
return new String(msg.getData());
}

Message msg = reader.readNext(2500, TimeUnit.MILLISECONDS);

if (msg == null) {
return new String("No Message On Output Topic");
}

if (msg.getProperties().containsKey("__pfn_input_msg_id__")
&& msg.getProperties().containsKey("__pfn_input_topic__")) {
MessageId newMsgId = MessageId.fromByteArray(
Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__")));
if (msgId.equals(newMsgId)
&& msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get(inputTopicToWrite).toString())) {
return new String(msg.getData());
}
curTime = System.currentTimeMillis();
}
throw new RestException(Status.REQUEST_TIMEOUT, "Request Timed Out");
} catch (SchemaSerializationException e) {
throw new RestException(Status.BAD_REQUEST, String.format("Failed to serialize input with error: %s. Please check if input data conforms with the schema of the input topic.", e.getMessage()));
} catch (IOException e) {
Expand All @@ -1017,6 +1015,9 @@ public String triggerFunction(final String tenant,
producer.closeAsync();
}
}

return null;

}

public FunctionState getFunctionState(final String tenant,
Expand Down

0 comments on commit b56a6d8

Please sign in to comment.