From 73fd40cef6515e518f750904309479d6e0575a36 Mon Sep 17 00:00:00 2001 From: "christian.edelsbrunn" Date: Tue, 15 Dec 2020 19:15:09 +0100 Subject: [PATCH] Adding SSL for Schema Registry, SASL for Broker and upgrade to confluent 6.0 --- README.md | 55 +++++++++++ build.gradle | 15 ++- src/main/java/at/esque/kafka/Controller.java | 4 +- .../esque/kafka/CrossClusterController.java | 2 +- .../SchemaRegistryBrowserController.java | 14 ++- .../at/esque/kafka/cluster/ClusterConfig.java | 91 +++++++++++++++++++ .../kafka/cluster/KafkaesqueAdminClient.java | 3 +- .../esque/kafka/handlers/ConfigHandler.java | 49 ++++++++++ .../esque/kafka/handlers/ConsumerHandler.java | 2 + .../esque/kafka/handlers/ProducerHandler.java | 10 ++ .../serialization/ExtendedJsonDecoder.java | 91 +++++++++++++++++-- 11 files changed, 315 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 4e34982..fa83fb7 100644 --- a/README.md +++ b/README.md @@ -25,3 +25,58 @@ Allows for defining and configurating topics once and apply them to different cl ### Message Books Allows for playing a set of Messages over different topics into a cluster, see the [Wiki]("https://github.com/patschuh/KafkaEsque/wiki/Message-Books") for details *** + +### Authentication +At the moment the UI only supports cluster configs without any authentication +Within the cluster.json file it is possible to configure Authentication for Kafka and Confluent Schema Registry: +Note: the secrets have to be given in the json file in plain text. This might be a security issue. Feel free to apply a PR if you want to improve this. + +sslEnabled controls the SSL Authentication method + +###### Example for SSL with mTLS Authentication to the broker: + ``` + { + "identifier": "my-mtls-secured-cluster", + "bootstrapServers": "broker:", + "sslEnabled": true, + "keyStoreLocation": "mykeystore.jks", + "keyStorePassword": "mykeystorepw", + "trustStoreLocation": "mytruststore.jks", + "trustStorePassword": "mykeystorepw" + } + ``` +###### Example for SASL_SSL Authentication + +saslSecurityProtocol,saslMechanism and saslJaasConfig can be provided +This can also be combined with given trust and keystore configuration + + ``` + { + "identifier": "my-mtls-secured-cluster", + "bootstrapServers": "broker:", + "saslSecurityProtocol": "SASL_SSL", + "saslMechanism" : "PLAIN", + "saslJaasConfig" : "org.apache.kafka.common.security.plain.PlainLoginModule required serviceName=kafka username=\"MYUSER\" password=\"53CR37\";" + } + ``` + +###### Example with Schema Registry with HTTPS and Basic Auth + +The http**s** and 'sslEnabled' is important if you want to use truststore and/or keystore otherwise those attributes are ignored and now sslContext is provided to Schema Registry client + +you can use only Basic Auth if you SR is only protected with basic auth, you can use only keystore+truststore if your SR is protected with mTLS or you can use both settings in parallel. + + ``` + { + .... + "schemaRegistry": "https://myschemaregistry:8081", + "schemaRegistryBasicAuthUserInfo": ":", + ... + "sslEnabled": true, + "keyStoreLocation": "mykeystore.jks", + "keyStorePassword": "mykeystorepw", + "trustStoreLocation": "mytruststore.jks", + "trustStorePassword": "mykeystorepw" + } + ``` + diff --git a/build.gradle b/build.gradle index 5e900eb..b5f53e5 100644 --- a/build.gradle +++ b/build.gradle @@ -28,7 +28,7 @@ jfx { repositories { mavenLocal() - + maven { url = 'https://repo1.maven.org/maven2' } @@ -36,10 +36,15 @@ repositories { maven { url = 'https://packages.confluent.io/maven/' } + + maven { + url = 'https://repository.mulesoft.org/nexus/content/repositories/public/' + } } dependencies { - compile 'org.apache.kafka:kafka-clients:2.0.0' + compile group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version: '1.9.13' + compile 'org.apache.kafka:kafka-clients:2.5.1' compile 'com.fasterxml.jackson.core:jackson-databind:2.9.9' compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.9.9' compile 'org.slf4j:slf4j-simple:1.7.25' @@ -47,12 +52,12 @@ dependencies { compile 'org.kordamp.ikonli:ikonli-javafx:2.4.0' compile 'org.kordamp.ikonli:ikonli-fontawesome-pack:2.4.0' compile 'com.opencsv:opencsv:4.0' - compile 'io.confluent:kafka-schema-registry:5.0.0' - compile 'io.confluent:kafka-avro-serializer:5.0.0' + compile 'io.confluent:kafka-schema-registry:6.0.0' + compile 'io.confluent:kafka-avro-serializer:6.0.0' compile 'com.google.inject:guice:4.2.2' compile 'org.fxmisc.richtext:richtextfx:0.10.5' compile 'net.thisptr:jackson-jq:1.0.0-preview.20191208' - + testCompile 'junit:junit:4.12' testCompile 'org.springframework.kafka:spring-kafka-test:2.2.0.RELEASE' testCompile 'com.google.code.tempus-fugit:tempus-fugit:1.1' diff --git a/src/main/java/at/esque/kafka/Controller.java b/src/main/java/at/esque/kafka/Controller.java index af67148..59a0c9f 100644 --- a/src/main/java/at/esque/kafka/Controller.java +++ b/src/main/java/at/esque/kafka/Controller.java @@ -230,7 +230,7 @@ public void setup(Stage controlledStage) { specificOffsetTextField.setVisible(newValue == FetchTypes.SPECIFIC_OFFSET)); clusterComboBox.getSelectionModel().selectedItemProperty().addListener((observable, oldValue, newValue) -> { - adminClient = new KafkaesqueAdminClient(newValue.getBootStrapServers(), configHandler.getSslProperties(selectedCluster())); + adminClient = new KafkaesqueAdminClient(newValue.getBootStrapServers(), configHandler.getSslProperties(selectedCluster()), configHandler.getSaslProperties(selectedCluster())); refreshTopicList(newValue); }); @@ -536,7 +536,7 @@ public void schemaRegistryClick(ActionEvent event) { FXMLLoader fxmlLoader = new FXMLLoader(getClass().getResource("/fxml/schemaRegistryBrowser.fxml")); Parent root1 = fxmlLoader.load(); SchemaRegistryBrowserController controller = fxmlLoader.getController(); - controller.setup(selectedConfig.getSchemaRegistry()); + controller.setup(selectedConfig,configHandler); Stage stage = new Stage(); stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); stage.initModality(Modality.APPLICATION_MODAL); diff --git a/src/main/java/at/esque/kafka/CrossClusterController.java b/src/main/java/at/esque/kafka/CrossClusterController.java index 6a1e550..d891a5c 100644 --- a/src/main/java/at/esque/kafka/CrossClusterController.java +++ b/src/main/java/at/esque/kafka/CrossClusterController.java @@ -143,7 +143,7 @@ private void setupClusterControls(ClusterConfig clusterConfig, KafkaesqueAdminCl if (adminClient != null) { adminClient.close(); } - adminClient = new KafkaesqueAdminClient(clusterConfig.getBootStrapServers(), configHandler.getSslProperties(clusterConfig)); + adminClient = new KafkaesqueAdminClient(clusterConfig.getBootStrapServers(), configHandler.getSslProperties(clusterConfig),configHandler.getSaslProperties(clusterConfig)); KafkaesqueAdminClient finalAdminClient = adminClient; runInDaemonThread(() -> { ObservableList topics = FXCollections.observableArrayList(finalAdminClient.getTopics()); diff --git a/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java b/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java index aafc97e..36c5099 100644 --- a/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java +++ b/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java @@ -2,9 +2,11 @@ import at.esque.kafka.alerts.ConfirmationAlert; import at.esque.kafka.alerts.ErrorAlert; +import at.esque.kafka.cluster.ClusterConfig; import at.esque.kafka.controls.FilterableListView; import at.esque.kafka.controls.JsonTreeView; import at.esque.kafka.controls.KafkaEsqueCodeArea; +import at.esque.kafka.handlers.ConfigHandler; import io.confluent.kafka.schemaregistry.client.rest.RestService; import javafx.collections.FXCollections; import javafx.event.ActionEvent; @@ -37,8 +39,16 @@ public class SchemaRegistryBrowserController { private JsonTreeView jsonTreeView; - public void setup(String schemaregistryUrl) { - schemaRegistryRestService = new RestService(schemaregistryUrl); + public void setup(ClusterConfig selectedConfig, ConfigHandler configHandler) { + schemaRegistryRestService = new RestService(selectedConfig.getSchemaRegistry()); + + if(selectedConfig.isSchemaRegistryHttps()) + { + schemaRegistryRestService.setSslSocketFactory(selectedConfig.buildSSlSocketFactory()); + } + + schemaRegistryRestService.configure(configHandler.getSchemaRegistryAuthProperties(selectedConfig)); + jsonTreeView.jsonStringProperty().bind(schemaTextArea.textProperty()); try { versionComboBox.getSelectionModel().selectedItemProperty().addListener(((observable1, oldValue1, newValue1) -> { diff --git a/src/main/java/at/esque/kafka/cluster/ClusterConfig.java b/src/main/java/at/esque/kafka/cluster/ClusterConfig.java index e2e65d0..9683128 100644 --- a/src/main/java/at/esque/kafka/cluster/ClusterConfig.java +++ b/src/main/java/at/esque/kafka/cluster/ClusterConfig.java @@ -6,15 +6,28 @@ import javafx.beans.property.SimpleStringProperty; import javafx.beans.property.StringProperty; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManagerFactory; +import java.io.FileInputStream; +import java.io.IOException; +import java.security.*; +import java.security.cert.CertificateException; + public class ClusterConfig { private StringProperty identifier = new SimpleStringProperty(); private StringProperty bootStrapServers = new SimpleStringProperty(); private StringProperty schemaRegistry = new SimpleStringProperty(); + private StringProperty schemaRegistryBasicAuthUserInfo = new SimpleStringProperty(); private BooleanProperty sslEnabled = new SimpleBooleanProperty(); private StringProperty keyStoreLocation = new SimpleStringProperty(); private StringProperty keyStorePassword = new SimpleStringProperty(); private StringProperty trustStoreLocation = new SimpleStringProperty(); private StringProperty trustStorePassword = new SimpleStringProperty(); + private StringProperty saslSecurityProtocol = new SimpleStringProperty(); + private StringProperty saslMechanism = new SimpleStringProperty(); + private StringProperty saslJaasConfig = new SimpleStringProperty(); @JsonProperty("identifier") public String getIdentifier() { @@ -120,8 +133,86 @@ public void setTrustStorePassword(String trustStorePassword) { this.trustStorePassword.set(trustStorePassword); } + + @JsonProperty("saslSecurityProtocol") + public String getSaslSecurityProtocol() { return saslSecurityProtocol.get(); } + + public StringProperty saslSecurityProtocolProperty() { + return saslSecurityProtocol; + } + + public void setSaslSecurityProtocol(String saslSecurityProtocol) { this.saslSecurityProtocol.set(saslSecurityProtocol); } + + @JsonProperty("saslMechanism") + public String getSaslMechanism() { + return saslMechanism.get(); + } + + public StringProperty saslMechanismProperty() { + return saslMechanism; + } + + public void setSaslMechanism(String saslMechanism) { + this.saslMechanism.set(saslMechanism); + } + + @JsonProperty("saslJaasConfig") + public String getSaslJaasConfig() { + return saslJaasConfig.get(); + } + + public StringProperty saslJaasConfigProperty() { + return saslJaasConfig; + } + + public void setSaslJaasConfig(String saslJaasConfig) { this.saslJaasConfig.set(saslJaasConfig); } + + @JsonProperty("schemaRegistryBasicAuthUserInfo") + public String getSchemaRegistryBasicAuthUserInfo() { + return schemaRegistryBasicAuthUserInfo.get(); + } + + public StringProperty schemaRegistryBasicAuthUserInfoProperty() { + return schemaRegistryBasicAuthUserInfo; + } + + public void setSchemaRegistryBasicAuthUserInfo(String schemaRegistryBasicAuthUserInfo) { this.schemaRegistryBasicAuthUserInfo.set(schemaRegistryBasicAuthUserInfo); } + + public boolean isSchemaRegistryHttps() + { + return this.getSchemaRegistry().toLowerCase().startsWith("https:"); + } + @Override public String toString(){ return String.format("%s (%s)", getIdentifier(), getBootStrapServers()); } + + + + public SSLSocketFactory buildSSlSocketFactory() { + try { + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(new FileInputStream(this.getKeyStoreLocation()), this.getKeyStorePassword().toCharArray()); + + KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType()); + ts.load(new FileInputStream(this.getTrustStoreLocation()), this.getTrustStorePassword().toCharArray()); + + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, this.getKeyStorePassword().toCharArray()); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ts); + + SSLContext sc = SSLContext.getInstance("TLSv1.2"); + sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + + return sc.getSocketFactory(); + + } catch (KeyStoreException | IOException | NoSuchAlgorithmException | CertificateException | UnrecoverableKeyException | KeyManagementException e) { + at.esque.kafka.alerts.ErrorAlert.show(e); + return null; + } + } + } diff --git a/src/main/java/at/esque/kafka/cluster/KafkaesqueAdminClient.java b/src/main/java/at/esque/kafka/cluster/KafkaesqueAdminClient.java index 5798a1e..eb4982c 100644 --- a/src/main/java/at/esque/kafka/cluster/KafkaesqueAdminClient.java +++ b/src/main/java/at/esque/kafka/cluster/KafkaesqueAdminClient.java @@ -35,11 +35,12 @@ public class KafkaesqueAdminClient { private AdminClient adminClient; - public KafkaesqueAdminClient(String bootstrapServers, Map sslProps) { + public KafkaesqueAdminClient(String bootstrapServers, Map sslProps, Map saslProps) { Properties props = new Properties(); props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, String.format("kafkaesque-%s", UUID.randomUUID())); props.putAll(sslProps); + props.putAll(saslProps); this.adminClient = AdminClient.create(props); } diff --git a/src/main/java/at/esque/kafka/handlers/ConfigHandler.java b/src/main/java/at/esque/kafka/handlers/ConfigHandler.java index 7f3eed7..b41d031 100644 --- a/src/main/java/at/esque/kafka/handlers/ConfigHandler.java +++ b/src/main/java/at/esque/kafka/handlers/ConfigHandler.java @@ -8,8 +8,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; import com.google.inject.Singleton; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -181,27 +183,74 @@ public Map getSslProperties(ClusterConfig config) { if (config.isSslEnabled()) { props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); } + + if (config.isSchemaRegistryHttps()) { + props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + } + if (StringUtils.isNotEmpty(config.getKeyStoreLocation())) { String keyStoreLocation = getJksStoreLocation(config.getIdentifier(), config.getKeyStoreLocation()); if (keyStoreLocation != null) { props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreLocation); + props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreLocation); + if (StringUtils.isNotEmpty(config.getKeyStorePassword())) { props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, config.getKeyStorePassword()); + props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, config.getKeyStorePassword()); } } } if (StringUtils.isNotEmpty(config.getTrustStoreLocation())) { String trustStoreLocation = getJksStoreLocation(config.getIdentifier(), config.getTrustStoreLocation()); + if (trustStoreLocation != null) { props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation); + props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation); + if (StringUtils.isNotEmpty(config.getTrustStorePassword())) { props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, config.getTrustStorePassword()); + props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, config.getTrustStorePassword()); } } } return props; } + + public Map getSaslProperties(ClusterConfig config) { + Map props = new HashMap<>(); + + if (StringUtils.isNoneEmpty(config.getSaslSecurityProtocol())) + { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,config.getSaslSecurityProtocol()); + } + + if (StringUtils.isNotEmpty(config.getSaslMechanism())) + { + props.put(SaslConfigs.SASL_MECHANISM,config.getSaslMechanism()); + } + + + if (StringUtils.isNotEmpty(config.getSaslJaasConfig())) + { + props.put(SaslConfigs.SASL_JAAS_CONFIG,config.getSaslJaasConfig()); + } + + return props; + } + + public Map getSchemaRegistryAuthProperties(ClusterConfig config) { + Map props = new HashMap<>(); + + if (StringUtils.isNoneEmpty(config.getSchemaRegistryBasicAuthUserInfo())) + { + props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE,"USER_INFO"); + props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getSchemaRegistryBasicAuthUserInfo()); + } + + return props; + } + private String getJksStoreLocation(String clusterIdentification, String location) { File jksStore = new File(location); if (jksStore.exists() && jksStore.isFile()) { diff --git a/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java b/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java index ac63442..b86bd99 100644 --- a/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java +++ b/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java @@ -74,6 +74,8 @@ public UUID registerConsumer(ClusterConfig config, TopicMessageTypeConfig topicM consumerProps.setProperty("schema.registry.url", config.getSchemaRegistry()); } consumerProps.putAll(configHandler.getSslProperties(config)); + consumerProps.putAll(configHandler.getSaslProperties(config)); + consumerProps.putAll(configHandler.getSchemaRegistryAuthProperties(config)); consumerProps.putAll(consumerConfigs); LOGGER.info("Creating new Consumer with properties: [{}]", consumerProps); diff --git a/src/main/java/at/esque/kafka/handlers/ProducerHandler.java b/src/main/java/at/esque/kafka/handlers/ProducerHandler.java index 2b22152..4c4268d 100644 --- a/src/main/java/at/esque/kafka/handlers/ProducerHandler.java +++ b/src/main/java/at/esque/kafka/handlers/ProducerHandler.java @@ -72,9 +72,19 @@ public UUID registerProducer(ClusterConfig clusterConfig) throws IOException { props.put("kafkaesque.confighandler", configHandler); if (StringUtils.isNotEmpty(clusterConfig.getSchemaRegistry())) { props.setProperty("schema.registry.url", clusterConfig.getSchemaRegistry()); + props.putAll(configHandler.getSchemaRegistryAuthProperties(clusterConfig)); schemaRegistryRestService = new RestService(clusterConfig.getSchemaRegistry()); + + schemaRegistryRestService.configure(configHandler.getSchemaRegistryAuthProperties(clusterConfig)); + + if (clusterConfig.isSchemaRegistryHttps()) + { + schemaRegistryRestService.setSslSocketFactory(clusterConfig.buildSSlSocketFactory()); + } } + props.putAll(configHandler.getSslProperties(clusterConfig)); + props.putAll(configHandler.getSaslProperties(clusterConfig)); props.putAll(configHandler.readProducerConfigs(clusterConfig.getIdentifier())); LOGGER.info("Creating new Producer with properties: [{}]", props); diff --git a/src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java b/src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java index 3409c8a..da028df 100644 --- a/src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java +++ b/src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java @@ -28,19 +28,16 @@ import org.apache.avro.io.parsing.Parser; import org.apache.avro.io.parsing.Symbol; import org.apache.avro.util.Utf8; -import org.codehaus.jackson.Base64Variant; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonLocation; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonStreamContext; -import org.codehaus.jackson.JsonToken; -import org.codehaus.jackson.ObjectCodec; -import org.codehaus.jackson.node.NullNode; +import com.fasterxml.jackson.core.*; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import javax.validation.constraints.Null; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; @@ -593,6 +590,11 @@ public void setCodec(ObjectCodec c) { throw new UnsupportedOperationException(); } + @Override + public Version version() { + throw new UnsupportedOperationException(); + } + @Override public void close() throws IOException { throw new UnsupportedOperationException(); @@ -604,6 +606,11 @@ public JsonToken nextToken() throws IOException { return elements.get(pos).token; } + @Override + public JsonToken nextValue() throws IOException { + throw new UnsupportedOperationException(); + } + @Override public JsonParser skipChildren() throws IOException { JsonToken tkn = elements.get(pos).token; @@ -668,6 +675,11 @@ public int getTextOffset() throws IOException { throw new UnsupportedOperationException(); } + @Override + public boolean hasTextCharacters() { + throw new UnsupportedOperationException(); + } + @Override public Number getNumberValue() throws IOException { throw new UnsupportedOperationException(); @@ -714,10 +726,50 @@ public byte[] getBinaryValue(Base64Variant b64variant) throw new UnsupportedOperationException(); } + @Override + public String getValueAsString(String def) throws IOException { + return getText(); + } + @Override public JsonToken getCurrentToken() { return elements.get(pos).token; } + + @Override + public int getCurrentTokenId() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasCurrentToken() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasTokenId(int id) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasToken(JsonToken t) { + return false; + } + + @Override + public void clearCurrentToken() { + + } + + @Override + public JsonToken getLastClearedToken() { + return null; + } + + @Override + public void overrideCurrentName(String name) { + + } }; } @@ -733,7 +785,25 @@ private void injectDefaultValueIfAvailable(final JsonParser in, String fieldName boolean isNull = field == null; - JsonNode defVal = isNull ? NullNode.getInstance() : field.defaultValue(); + /* Avro 1.9 changed the .defaultValue method to private - defaultVal should be used instead but that already returns + * an Object and not the JSONode as the defaultValue method did. It was quite hard to do a proper change of the method + * due lack of understanding and tests therefore we just used reflection to keep it working as it worked before + */ + try{ + Method method = field.getClass().getDeclaredMethod("defaultValue"); + method.setAccessible(true); + + JsonNode defVal = isNull ? NullNode.getInstance() : (JsonNode) method.invoke(field); //.defaultVal is the new one + } catch (NoSuchMethodException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } catch (InvocationTargetException e) { + e.printStackTrace(); + } + + JsonNode defVal = NullNode.getInstance(); + if (defVal == null) { throw new AvroTypeException("Expected field name not found: " + fieldName); } @@ -754,6 +824,7 @@ private void injectDefaultValueIfAvailable(final JsonParser in, String fieldName } currentReorderBuffer.origParser = in; this.in = makeParser(result); + } private static Field findField(Schema schema, String name) {