Skip to content

Commit

Permalink
Merge branch '7.3.x' into 7.4.x
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Apr 18, 2023
2 parents 37654b3 + 4ee9cac commit 569b100
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.storage.Converter;

import java.util.Collections;
Expand Down Expand Up @@ -95,6 +97,11 @@ public byte[] fromConnectData(String topic, Headers headers, Schema schema, Obje
headers,
avroData.fromConnectData(schema, avroSchema, value),
new AvroSchema(avroSchema));
} catch (TimeoutException e) {
throw new RetriableException(
String.format("Failed to serialize Avro data from topic %s :", topic),
e
);
} catch (SerializationException e) {
throw new DataException(
String.format("Failed to serialize Avro data from topic %s :", topic),
Expand Down Expand Up @@ -131,6 +138,11 @@ public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value)
throw new DataException(
String.format("Unsupported type returned during deserialization of topic %s ", topic)
);
} catch (TimeoutException e) {
throw new RetriableException(
String.format("Failed to deserialize data for topic %s to Avro: ", topic),
e
);
} catch (SerializationException e) {
throw new DataException(
String.format("Failed to deserialize data for topic %s to Avro: ", topic),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -50,6 +51,7 @@
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;

public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaSchemaSerDe {
Expand Down Expand Up @@ -198,6 +200,12 @@ private Integer schemaVersion(String topic,
AvroSchema subjectSchema = (AvroSchema) schemaRegistry.getSchemaBySubjectAndId(subject, id);
version = schemaRegistry.getVersion(subject, subjectSchema);
return version;
} catch (InterruptedIOException e) {
String errorMessage = "Error retrieving Avro "
+ getSchemaType(isKey)
+ " schema version for id "
+ id;
throw new TimeoutException(errorMessage, e);
} catch (IOException e) {
throw new SerializationException("Error retrieving Avro "
+ getSchemaType(isKey)
Expand Down Expand Up @@ -384,6 +392,12 @@ AvroSchema schemaFromRegistry() {
String subjectName = isKey == null || strategyUsesSchema(isKey)
? getContext() : getSubject();
return (AvroSchema) schemaRegistry.getSchemaBySubjectAndId(subjectName, schemaId);
} catch (InterruptedIOException e) {
String errorMessage = "Error retrieving Avro "
+ getSchemaType(isKey)
+ " schema for id "
+ schemaId;
throw new TimeoutException(errorMessage, e);
} catch (IOException e) {
throw new SerializationException("Error retrieving Avro "
+ getSchemaType(isKey)
Expand All @@ -403,6 +417,12 @@ AvroSchema schemaForDeserialize() {
return isDeprecatedSubjectNameStrategy(isKey)
? AvroSchemaUtils.copyOf(schemaFromRegistry())
: (AvroSchema) schemaRegistry.getSchemaBySubjectAndId(getSubject(), schemaId);
} catch (InterruptedIOException e) {
String errorMessage = "Error retrieving Avro "
+ getSchemaType(isKey)
+ " schema for id "
+ schemaId;
throw new TimeoutException(errorMessage, e);
} catch (IOException e) {
throw new SerializationException("Error retrieving Avro "
+ getSchemaType(isKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
Expand All @@ -36,6 +37,7 @@
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;

public abstract class AbstractKafkaAvroSerializer extends AbstractKafkaSchemaSerDe {
Expand Down Expand Up @@ -153,6 +155,8 @@ protected byte[] serializeImpl(
return bytes;
} catch (ExecutionException ex) {
throw new SerializationException("Error serializing Avro message", ex.getCause());
} catch (InterruptedIOException e) {
throw new TimeoutException("Error serializing Avro message", e);
} catch (IOException | RuntimeException e) {
// avro serialization can throw AvroRuntimeException, NullPointerException,
// ClassCastException, etc
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
files="(Errors|AvroMessageReader).java"/>

<suppress checks="CyclomaticComplexity"
files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|AvroSchema|AvroSchemaUtils|CompatibilityResource|ConfigResource|Context|ContextKey|KafkaSchemaRegistry|KafkaStore|KafkaStoreMessageHandler|KafkaStoreReaderThread|AvroData|DownloadSchemaRegistryMojo|MockSchemaRegistryClient|SchemaRegistrySerializer|SchemaValue|SubjectVersionsResource|ProtobufSchema|SchemaDiff|FieldSchemaDiff|MessageSchemaDiff|DynamicSchema|SchemaMessageFormatter|ProtobufData|JsonSchema|JSON.*|AbstractKafkaJsonSchemaDeserializer|JsonSchemaData|JsonSchemaUtils|MessageDefinition|ProtobufSchemaUtils|SchemaMessageReader|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|SubjectKeyComparator|ContextFilter|QualifiedSubject|Schema|AvroTypeDescription|CelExecutor|Rule|WildcardMatcher|JsonSchemaComparator).java"/>
files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|AvroSchema|AvroSchemaUtils|CompatibilityResource|ConfigResource|Context|ContextKey|KafkaSchemaRegistry|KafkaStore|KafkaStoreMessageHandler|KafkaStoreReaderThread|AvroData|DownloadSchemaRegistryMojo|MockSchemaRegistryClient|SchemaRegistrySerializer|SchemaValue|SubjectVersionsResource|ProtobufSchema|SchemaDiff|FieldSchemaDiff|MessageSchemaDiff|DynamicSchema|SchemaMessageFormatter|ProtobufData|JsonSchema|JSON.*|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaJsonSchemaSerializer|JsonSchemaData|JsonSchemaUtils|MessageDefinition|ProtobufSchemaUtils|SchemaMessageReader|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|SubjectKeyComparator|ContextFilter|QualifiedSubject|Schema|AvroTypeDescription|CelExecutor|Rule|WildcardMatcher|JsonSchemaComparator).java"/>

<suppress checks="NPathComplexity"
files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.storage.Converter;

import java.util.Collections;
Expand Down Expand Up @@ -94,6 +96,12 @@ public byte[] fromConnectData(String topic, Headers headers, Schema schema, Obje
JsonNode jsonValue = jsonSchemaData.fromConnectData(schema, value);
try {
return serializer.serialize(topic, headers, isKey, jsonValue, jsonSchema);
} catch (TimeoutException e) {
throw new RetriableException(String.format("Converting Kafka Connect data to byte[] failed "
+ "due to serialization error of topic %s: ",
topic),
e
);
} catch (SerializationException e) {
throw new DataException(String.format("Converting Kafka Connect data to byte[] failed due to "
+ "serialization error of topic %s: ",
Expand Down Expand Up @@ -126,6 +134,12 @@ public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value)
Schema schema = jsonSchemaData.toConnectSchema(jsonSchema);
return new SchemaAndValue(schema, jsonSchemaData.toConnectData(schema,
(JsonNode) deserialized.getValue()));
} catch (TimeoutException e) {
throw new RetriableException(String.format("Converting byte[] to Kafka Connect data failed "
+ "due to serialization error of topic %s: ",
topic),
e
);
} catch (SerializationException e) {
throw new DataException(String.format("Converting byte[] to Kafka Connect data failed due to "
+ "serialization error of topic %s: ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.everit.json.schema.CombinedSchema;
import org.everit.json.schema.ReferenceSchema;
Expand Down Expand Up @@ -228,6 +230,8 @@ protected Object deserialize(
}

return value;
} catch (InterruptedIOException e) {
throw new TimeoutException("Error deserializing JSON message for id " + id, e);
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing JSON message for id " + id, e);
} catch (RestClientException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
import io.confluent.kafka.schemaregistry.json.SpecificationVersion;
import java.io.InterruptedIOException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.everit.json.schema.ValidationException;

Expand Down Expand Up @@ -147,6 +149,8 @@ protected byte[] serializeImpl(
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (InterruptedIOException e) {
throw new TimeoutException("Error serializing JSON message", e);
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing JSON message", e);
} catch (RestClientException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.storage.Converter;

import java.util.Collections;
Expand Down Expand Up @@ -102,6 +104,11 @@ public byte[] fromConnectData(String topic, Headers headers, Schema schema, Obje
} else {
throw new DataException("Unsupported object of class " + v.getClass().getName());
}
} catch (TimeoutException e) {
throw new RetriableException(String.format(
"Failed to serialize Protobuf data from topic %s :",
topic
), e);
} catch (SerializationException e) {
throw new DataException(String.format(
"Failed to serialize Protobuf data from topic %s :",
Expand Down Expand Up @@ -138,6 +145,11 @@ public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value)
topic
));
}
} catch (TimeoutException e) {
throw new RetriableException(String.format(
"Failed to deserialize data for topic %s to Protobuf: ",
topic
), e);
} catch (SerializationException e) {
throw new DataException(String.format(
"Failed to deserialize data for topic %s to Protobuf: ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
import io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -44,6 +45,7 @@
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;

public abstract class AbstractKafkaProtobufDeserializer<T extends Message>
Expand Down Expand Up @@ -220,6 +222,8 @@ protected Object deserialize(
}

return value;
} catch (InterruptedIOException e) {
throw new TimeoutException("Error deserializing Protobuf message for id " + id, e);
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing Protobuf message for id " + id, e);
} catch (RestClientException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
import java.io.InterruptedIOException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
Expand All @@ -40,6 +41,7 @@
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe;
import io.confluent.kafka.serializers.subject.strategy.ReferenceSubjectNameStrategy;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;

public abstract class AbstractKafkaProtobufSerializer<T extends Message>
Expand Down Expand Up @@ -146,6 +148,8 @@ useLatestForDeps, latestCompatStrict, latestVersionsCache(),
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (InterruptedIOException e) {
throw new TimeoutException("Error serializing Protobuf message", e);
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing Protobuf message", e);
} catch (RestClientException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

Expand Down Expand Up @@ -95,6 +97,8 @@ public byte[] serialize(String topic, Headers headers, T record) {
schema = resolveDependencies(schemaRegistry, normalizeSchema, autoRegisterForDeps,
useLatestForDeps, latestCompatStrict, latestVersionsCache(),
skipKnownTypes, referenceSubjectNameStrategy, topic, isKey, schema);
} catch (InterruptedIOException e) {
throw new TimeoutException("Error serializing Protobuf message", e);
} catch (IOException | RestClientException e) {
throw new SerializationException("Error serializing Protobuf message", e);
}
Expand Down

0 comments on commit 569b100

Please sign in to comment.