-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Edited for grammar and clarity
Describe the bug
If a Function is created with an input based on a user-defined Type and a JSON message is sent in that is in a non-standard format (e.g. key is not surrounded by quotes), the function will fail, no acknowledgement is sent back to Pulsar for that message offset, and the function will be restarted. Because no acknowledgement was sent, it will try to process the same message and then fail again, not send an acknowledgement and restart, ad infinitum.
Pull Request #4004 was initially created due to this issue, but was closed due to consensus that the ObjectMapper should be kept as close to standard as possible which is fine, but defensive coding must be put in place to avoid the infinite loop.
The following options were tried, but ignored by Pulsar.
--auto-ack true
--max-message-retries <some finite number N, e.g. 1>
To Reproduce
Steps to reproduce the behavior:
- Create a Function that accepts a user-defined Type. Example provided.
pulsar2.zip - Deploy the function with either "--auto-ack true" or "--max-message-retries ".
- Tail the function log (/pulsar/logs////-0.log
- Send a message into the input-topic of the function that doesn't match the JSON standard
ex: bin/pulsar-client produce -m "{fruitType:"Orange"}" fruit-input
Expected behavior
The function should fail to convert, log the message to the log file, ack back, if "auto-ack true" is enabled, or retry the message N times, if "--max-message-retries N" is enabled, and then ack back, and then continue to accept new messages.
Screenshots
N/A
Desktop (please complete the following information):
- OS: Ubuntu 16.04.6 LTS
- Pulsar version: 2.3.0
Additional context
StackTrace:
13:01:31.320 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [fruit-input][public/default/FnFruit] Subscribing to topic on cnx [id: 0xc97183ef, L:/127.0.0.1:35264 - R:/127.0.0.1:6650]
13:01:31.393 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [fruit-input][public/default/FnFruit] Subscribed to topic on /127.0.0.1:6650 -- consumer: 0
13:03:00.450 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
13:03:00.498 [public/default/FnFruit-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/FnFruit:0] Uncaught exception in Java Instance
org.apache.pulsar.client.api.SchemaSerializationException: org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): was expecting double-quote to start field name
at [Source: (byte[])"{fruitType:"Orange"}"; line: 1, column: 3]
at org.apache.pulsar.client.impl.schema.JSONSchema.decode(JSONSchema.java:84) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:233) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:458) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:243) [java-instance.jar:2.3.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): was expecting double-quote to start field name
at [Source: (byte[])"{fruitType:"Orange"}"; line: 1, column: 3]
at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:669) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:567) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:1988) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1639) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:725) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[java-instance.jar:2.3.0]
at org.apache.pulsar.client.impl.schema.JSONSchema.decode(JSONSchema.java:82) ~[java-instance.jar:2.3.0]
... 5 more
13:03:00.505 [public/default/FnFruit-0] INFO org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance
13:03:00.513 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [fruit-input] [public/default/FnFruit] Closed consumer
13:03:00.517 [public/default/FnFruit-0] INFO org.apache.pulsar.client.impl.ProducerImpl - [fruit-output] [standalone-4-6] Closed Producer