Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail without enqueueing iff the airbyte message type is unrecognized #40254

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ corresponds to that version.
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |
| 0.40.3 | 2024-06-18 | [\#39526](https://github.com/airbytehq/airbyte/pull/39526) | Destinations: INCOMPLETE stream status is a TRANSIENT error rather than SYSTEM |
| 0.40.2 | 2024-06-18 | [\#39552](https://github.com/airbytehq/airbyte/pull/39552) | Destinations: Throw error if the ConfiguredCatalog has no streams |
| 0.40.1 | 2024-06-14 | [\#39349](https://github.com/airbytehq/airbyte/pull/39349) | Source stats for full refresh streams |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,15 @@ constructor(
* do it without touching buffer manager.
*/
val partialAirbyteMessage =
airbyteMessageDeserializer.deserializeAirbyteMessage(
message,
)
try {
airbyteMessageDeserializer.deserializeAirbyteMessage(
message,
)
} catch (e: AirbyteMessageDeserializer.UnrecognizedAirbyteMessageTypeException) {
logger.warn { "Ignoring unrecognized message type: ${e.message}" }
return
}

when (partialAirbyteMessage.type) {
AirbyteMessage.Type.RECORD -> {
validateRecord(partialAirbyteMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package io.airbyte.cdk.integrations.destination.async.deser

import com.fasterxml.jackson.databind.exc.ValueInstantiationException
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
Expand All @@ -13,16 +14,27 @@ private val logger = KotlinLogging.logger {}
class AirbyteMessageDeserializer(
private val dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(),
) {
class UnrecognizedAirbyteMessageTypeException(private val unrecognizedType: String) :
Exception(unrecognizedType) {
override fun toString(): String {
return "Could not deserialize AirbyteMessage: unrecognized type: $unrecognizedType"
}
}

/**
* Deserializes to a [PartialAirbyteMessage] which can represent both a Record or a State
* Message
* Deserializes to a [PartialAirbyteMessage] which can represent both a Record, State, or Trace
* Message.
*
* Throws on deserialization errors, obfuscating the error message to avoid data leakage. In
* recoverable cases (currently only when the top-level message type is unrecognized), throws a
* dedicated exception.
*
* PartialAirbyteMessage holds either:
* * entire serialized message string when message is a valid State Message
* * serialized AirbyteRecordMessage when message is a valid Record Message
*
* @param message the string to deserialize
* @return PartialAirbyteMessage if the message is valid, empty otherwise
* @return PartialAirbyteMessage if the message is valid
*/
fun deserializeAirbyteMessage(
message: String?,
Expand All @@ -32,8 +44,29 @@ class AirbyteMessageDeserializer(
// Use JsonSubTypes and extend StdDeserializer to properly handle this.
// Make immutability a first class citizen in the PartialAirbyteMessage class.
val partial =
Jsons.tryDeserializeExact(message, PartialAirbyteMessage::class.java).orElseThrow {
RuntimeException("Unable to deserialize PartialAirbyteMessage.")
try {
Jsons.deserializeExactUnchecked(message, PartialAirbyteMessage::class.java)
} catch (e: ValueInstantiationException) {
// This is a hack to catch unrecognized message types. Jackson supports
// the equivalent via annotations, but we cannot use them because the
// AirbyteMessage
// is generated from json-schema.
val pat =
Regex("Cannot construct instance of .*AirbyteMessage.Type., problem: ([_A-Z]+)")
val match = pat.find(e.message!!)
if (match != null) {
val unrecognized = match.groups[1]?.value
logger.warn { "Unrecognized message type: $unrecognized" }
throw UnrecognizedAirbyteMessageTypeException(unrecognized!!)
} else {
val obfuscated = Jsons.obfuscateDeserializationException(e)
throw RuntimeException(
"ValueInstantiationException when deserializing PartialAirbyteMessage: $obfuscated"
)
}
} catch (e: Exception) {
val obfuscated = Jsons.obfuscateDeserializationException(e)
throw RuntimeException("Could not deserialize PartialAirbyteMessage: $obfuscated")
}

val msgType = partial.type
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.40.3
version=0.40.4
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import java.util.function.Consumer
import java.util.stream.Stream
import org.apache.commons.lang3.RandomStringUtils
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
Expand Down Expand Up @@ -732,4 +733,38 @@ class AsyncStreamConsumerTest {
}
assertEquals(expRecords, actualRecords)
}

@Test
internal fun deserializeAirbyteMessageWithUnrecognizedType() {
val airbyteMessage = AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
val serialized = Jsons.serialize(airbyteMessage)
edgao marked this conversation as resolved.
Show resolved Hide resolved
// Fake an upstream protocol change
val retyped =
serialized.replace(AirbyteMessage.Type.RECORD.toString(), "__UNKNOWN_TYPE_OF_MESSAGE__")
// Assert that this doesn't throw an exception
consumer.start()
assertDoesNotThrow { consumer.accept(retyped, retyped.length) }
}

@Test
internal fun deserializeAirbyteMessageWithUnrecognizedNonTypeEnum() {
// NOTE: We are only guaranteeing failure on the top-level message type. Anything else
// should break deserialization and result in an *obfuscated* error message.
val airbyteMessage =
AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withState(
AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.STREAM)
)
val serialized = Jsons.serialize(airbyteMessage)
// Fake an upstream protocol change
val offender = "__UNKNOWN_NONTYPE_ENUM__"
val retyped = serialized.replace("STREAM", offender)
// Assert that this doesn't throw an exception
consumer.start()
val throwable =
assertThrows(RuntimeException::class.java) { consumer.accept(retyped, retyped.length) }
// Ensure that the offending data has been scrubbed from the error message
assertFalse(throwable.message!!.contains(offender))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ object Jsons {
}
}

// WARNING: This message throws bare exceptions on parse failure which might
// leak sensitive data. Use obfuscateDeserializationException() to strip
// the sensitive data before logging.
@JvmStatic
johnny-schmidt marked this conversation as resolved.
Show resolved Hide resolved
fun <T : Any> deserializeExactUnchecked(jsonString: String?, klass: Class<T>?): T {
return OBJECT_MAPPER_EXACT.readValue(jsonString, klass)
}

@JvmStatic
fun <T : Any> tryDeserialize(jsonString: String, klass: Class<T>): Optional<T> {
return try {
Expand Down Expand Up @@ -425,9 +433,17 @@ object Jsons {
* potentially-sensitive information. </snip...>
*/
private fun <T : Any> handleDeserThrowable(throwable: Throwable): Optional<T> {
// Manually build the stacktrace, excluding the top-level exception object
// so that we don't accidentally include the exception message.
// Otherwise we could just do ExceptionUtils.getStackTrace(t).
val obfuscated = obfuscateDeserializationException(throwable)
LOGGER.warn { "Failed to deserialize json due to $obfuscated" }
return Optional.empty()
}

/**
* Build a stacktrace from the given throwable, enabling us to log or rethrow without leaking
* sensitive information in the exception message (which would be exposed with eg,
* ExceptionUtils.getStackTrace(t).)
*/
fun obfuscateDeserializationException(throwable: Throwable): String {
var t: Throwable = throwable
val sb = StringBuilder()
sb.append(t.javaClass)
Expand All @@ -444,8 +460,7 @@ object Jsons {
sb.append(traceElement.toString())
}
}
LOGGER.warn { "Failed to deserialize json due to $sb" }
return Optional.empty()
return sb.toString()
}

/**
Expand Down
Loading