From 89c5a128c10e658d9bb95bc0c869d884bd4d202f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 30 Oct 2017 11:59:55 -0400 Subject: [PATCH] NIFI-4459: Catch Exception if trying to iterate over many confluent schemas and unable to load one; in this case log a WARNING and continue on; also updated Jersey client to newest --- .../pom.xml | 10 +++++-- .../ConfluentSchemaRegistry.java | 2 +- .../client/RestSchemaRegistryClient.java | 27 ++++++++++++++----- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml index 69f09b38654f..f762393615d2 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml @@ -9,7 +9,8 @@ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> - + 4.0.0 @@ -48,11 +49,16 @@ jersey-client ${jersey.version} + + org.glassfish.jersey.inject + jersey-hk2 + ${jersey.version} + org.apache.nifi nifi-web-utils - + junit junit diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java index 070be7594056..113e0965473f 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java @@ -137,7 +137,7 @@ public void onEnabled(final ConfigurationContext context) { sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); } - final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext); + final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, getLogger()); final int cacheSize = context.getProperty(CACHE_SIZE).asInteger(); final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue(); diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java index 76dd43a4b7ea..b2ad19b57d9f 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java @@ -20,12 +20,14 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.web.util.WebUtils; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.node.ArrayNode; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; + import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientProperties; @@ -53,8 +55,10 @@ *

*/ public class RestSchemaRegistryClient implements SchemaRegistryClient { + private final List baseUrls; private final Client client; + private final ComponentLog logger; private static final String SUBJECT_FIELD_NAME = "subject"; private static final String VERSION_FIELD_NAME = "version"; @@ -65,13 +69,15 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { private final ConcurrentMap schemaIdentifierToNameMap = new ConcurrentHashMap<>(); - public RestSchemaRegistryClient(final List baseUrls, final int timeoutMillis, final SSLContext sslContext) { + public RestSchemaRegistryClient(final List baseUrls, final int timeoutMillis, final SSLContext sslContext, final ComponentLog logger) { this.baseUrls = new ArrayList<>(baseUrls); final ClientConfig clientConfig = new ClientConfig(); clientConfig.property(ClientProperties.CONNECT_TIMEOUT, timeoutMillis); clientConfig.property(ClientProperties.READ_TIMEOUT, timeoutMillis); client = WebUtils.createClient(clientConfig, sslContext); + + this.logger = logger; } @@ -107,10 +113,17 @@ public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotF final ArrayNode arrayNode = (ArrayNode) schemaNameArray; for (final JsonNode node : arrayNode) { - final String nodeName = node.getTextValue(); + final String nodeName = node.asText(); final String schemaPath = getSubjectPath(nodeName); - final JsonNode schemaNode = fetchJsonResponse(schemaPath, schemaDescription); + final JsonNode schemaNode; + try { + schemaNode = fetchJsonResponse(schemaPath, schemaDescription); + } catch (final SchemaNotFoundException | IOException e) { + logger.warn("Failed to fetch Schema with name '{}' from Confluent Schema Registry; " + + "will skip this schema and continue attempting to retrieve other schemas", new Object[] {nodeName, e}); + continue; + } final int id = schemaNode.get(ID_FIELD_NAME).asInt(); schemaNameToIdentifierMap.put(nodeName, id); @@ -125,10 +138,10 @@ public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotF } private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException { - final String subject = schemaNode.get(SUBJECT_FIELD_NAME).getTextValue(); + final String subject = schemaNode.get(SUBJECT_FIELD_NAME).asText(); final int version = schemaNode.get(VERSION_FIELD_NAME).asInt(); final int id = schemaNode.get(ID_FIELD_NAME).asInt(); - final String schemaText = schemaNode.get(SCHEMA_TEXT_FIELD_NAME).getTextValue(); + final String schemaText = schemaNode.get(SCHEMA_TEXT_FIELD_NAME).asText(); try { final Schema avroSchema = new Schema.Parser().parse(schemaText);