Skip to content

Commit

Permalink
Fix exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshm1 committed Aug 10, 2023
1 parent 049ba7b commit 9ef14fc
Show file tree
Hide file tree
Showing 27 changed files with 76 additions and 24 deletions.
Expand Up @@ -134,7 +134,7 @@ public Object deserialize(String topic, byte[] payload) throws SerializationExce
}

@Override
public void close() {
public void close() throws IOException {
if (keyDeserializer != null) {
keyDeserializer.close();
}
Expand Down
Expand Up @@ -173,7 +173,7 @@ public byte[] serialize(
}

@Override
public void close() {
public void close() throws IOException {
if (keySerializer != null) {
keySerializer.close();
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
import org.apache.avro.Schema;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.IOException;
import java.util.Map;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
Expand Down Expand Up @@ -62,6 +63,10 @@ public Object deserialize(String topic, byte[] bytes, Schema readerSchema) {

@Override
public void close() {
super.close();
try {
super.close();
} catch (IOException e) {
throw new RuntimeException("Exception while closing deserializer", e);
}
}
}
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.serialization.Serializer;

import java.io.IOException;
import java.util.Map;

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
Expand Down Expand Up @@ -64,6 +65,10 @@ public byte[] serialize(String topic, Object record) {

@Override
public void close() {
super.close();
try {
super.close();
} catch (IOException e) {
throw new RuntimeException("Exception while closing serializer", e);
}
}
}
Expand Up @@ -683,7 +683,7 @@ public synchronized void reset() {
}

@Override
public void close() {
public void close() throws IOException {
if (restService != null) {
restService.close();
}
Expand Down
Expand Up @@ -1255,7 +1255,7 @@ public void setProxy(String proxyHost, int proxyPort) {
}

@Override
public void close() {
public void close() throws IOException {
if (bearerAuthCredentialProvider != null) {
bearerAuthCredentialProvider.close();
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.Configurable;

import java.io.Closeable;
import java.io.IOException;
import java.net.URL;

public interface BearerAuthCredentialProvider extends Closeable, Configurable {
Expand Down
Expand Up @@ -141,7 +141,11 @@ protected ResourceCollection getStaticResources() {
public void onShutdown() {

if (schemaRegistry != null) {
schemaRegistry.close();
try {
schemaRegistry.close();
} catch (IOException e) {
log.error("Error closing schema registry", e);
}
}

if (schemaRegistryResourceExtensions != null) {
Expand Down
Expand Up @@ -1516,7 +1516,7 @@ private CloseableIterator<SchemaRegistryValue> allVersionsFromAllContexts(
}

@Override
public void close() {
public void close() throws IOException {
log.info("Shutting down schema registry");
kafkaStore.close();
if (leaderElector != null) {
Expand Down
Expand Up @@ -15,6 +15,7 @@

package io.confluent.kafka.schemaregistry.storage;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -91,7 +92,7 @@ List<String> isCompatible(String subject,
Schema newSchema,
List<Schema> previousSchemas) throws SchemaRegistryException;

void close();
void close() throws IOException;

void deleteSchemaVersion(String subject, Schema schema,
boolean permanentDelete) throws SchemaRegistryException;
Expand Down
Expand Up @@ -85,7 +85,7 @@ protected void doIteration(PerformanceStats.Callback cb) {
}

@Override
protected void close() {
protected void close() throws IOException {
if (client != null) {
client.close();
}
Expand Down
Expand Up @@ -143,7 +143,7 @@ protected void doIteration(PerformanceStats.Callback cb) {
cb.onCompletion(1, 0);
}

protected void close() {
protected void close() throws IOException {
// We can see some failures due to things like timeouts, but we want it to be obvious
// if there are too many failures (indicating a real underlying problem). 1% is an arbitrarily
// chosen limit.
Expand Down
Expand Up @@ -131,7 +131,7 @@ public JsonNode deserialize(String topic, byte[] payload) throws SerializationEx
}

@Override
public void close() {
public void close() throws IOException {
if (keyDeserializer != null) {
keyDeserializer.close();
}
Expand Down
Expand Up @@ -170,7 +170,7 @@ public byte[] serialize(
}

@Override
public void close() {
public void close() throws IOException {
if (keySerializer != null) {
keySerializer.close();
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.IOException;
import java.util.Map;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
Expand Down Expand Up @@ -81,6 +82,10 @@ public T deserialize(String topic, byte[] bytes) {

@Override
public void close() {
super.close();
try {
super.close();
} catch (IOException e) {
throw new RuntimeException("Exception while closing deserializer", e);
}
}
}
Expand Up @@ -90,6 +90,10 @@ private JsonSchema getSchema(T record) {

@Override
public void close() {
super.close();
try {
super.close();
} catch (IOException e) {
throw new RuntimeException("Exception while closing serializer", e);
}
}
}
Expand Up @@ -29,6 +29,7 @@

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -175,7 +176,11 @@ public void execute() throws MojoExecutionException, MojoFailureException {
);
}
}
close();
try {
close();
} catch (IOException e) {
throw new MojoExecutionException("Exception while closing schema registry client", e);
}
}

private String getExtension(ParsedSchema parsedSchema) {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.maven.plugins.annotations.Parameter;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -101,7 +102,7 @@ private List<SchemaProvider> defaultSchemaProviders() {
}

@Override
public void close() {
public void close() throws IOException {
if (client != null) {
client.close();
}
Expand Down
Expand Up @@ -149,7 +149,7 @@ public Message deserialize(String topic, byte[] payload) throws SerializationExc
}

@Override
public void close() {
public void close() throws IOException {
if (keyDeserializer != null) {
keyDeserializer.close();
}
Expand Down
Expand Up @@ -17,6 +17,8 @@

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;

import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
Expand Down Expand Up @@ -158,7 +160,7 @@ public byte[] serialize(
}

@Override
public void close() {
public void close() throws IOException {
if (keySerializer != null) {
keySerializer.close();
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.google.protobuf.Message;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.IOException;
import java.util.Map;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
Expand Down Expand Up @@ -78,6 +79,10 @@ public T deserialize(String topic, byte[] bytes) {

@Override
public void close() {
super.close();
try {
super.close();
} catch (IOException e) {
throw new RuntimeException("Exception while closing deserializer", e);
}
}
}
Expand Up @@ -103,6 +103,10 @@ public byte[] serialize(String topic, T record) {

@Override
public void close() {
super.close();
try {
super.close();
} catch (IOException e) {
throw new RuntimeException("Exception while closing serializer", e);
}
}
}
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.serialization.Deserializer;

import java.io.Closeable;
import java.io.IOException;

public interface SchemaMessageDeserializer<T> extends Closeable {

Expand Down
Expand Up @@ -199,7 +199,11 @@ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream o
@Override
public void close() {
if (deserializer != null) {
deserializer.close();
try {
deserializer.close();
} catch (IOException e) {
throw new RuntimeException("Exception while closing deserializer", e);
}
}
}

Expand Down
Expand Up @@ -383,7 +383,11 @@ private byte[] serializeNonSchemaKey(String keyString) {
@Override
public void close() {
if (serializer != null) {
serializer.close();
try {
serializer.close();
} catch (IOException e) {
throw new RuntimeException("Exception while closing serializer", e);
}
}
}
}
Expand Up @@ -21,6 +21,7 @@
import io.confluent.kafka.schemaregistry.ParsedSchema;

import java.io.Closeable;
import java.io.IOException;

public interface SchemaMessageSerializer<T> extends Closeable {

Expand Down
Expand Up @@ -265,7 +265,7 @@ protected static KafkaException toKafkaException(RestClientException e, String e
}

@Override
public void close() {
public void close() throws IOException {
if (schemaRegistry != null) {
schemaRegistry.close();
}
Expand Down

0 comments on commit 9ef14fc

Please sign in to comment.