From 18b1c9965a1f31962815f3a63d90d1bbf8253a34 Mon Sep 17 00:00:00 2001 From: Internet Date: Mon, 15 Jan 2018 13:35:43 +0200 Subject: [PATCH] get schema subject by id --- .../client/RestSchemaRegistryClient.java | 89 ++++++++++++------- 1 file changed, 56 insertions(+), 33 deletions(-) diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java index b2ad19b57d9f..cafac1188f3e 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java @@ -33,6 +33,7 @@ import javax.net.ssl.SSLContext; import javax.ws.rs.client.Client; +import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -41,8 +42,7 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; + /** *

@@ -64,9 +64,8 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { private static final String VERSION_FIELD_NAME = "version"; private static final String ID_FIELD_NAME = "id"; private static final String SCHEMA_TEXT_FIELD_NAME = "schema"; - - private final ConcurrentMap schemaNameToIdentifierMap = new ConcurrentHashMap<>(); - private final ConcurrentMap schemaIdentifierToNameMap = new ConcurrentHashMap<>(); + private static final String CONTENT_TYPE_HEADER = "Content-Type"; + private static final String SCHEMA_REGISTRY_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json"; public RestSchemaRegistryClient(final List baseUrls, final int timeoutMillis, final SSLContext sslContext, final ComponentLog logger) { @@ -100,41 +99,30 @@ public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotF // To make this more efficient, we will cache a mapping of Schema Name to identifier, so that we can look this up more efficiently. // Check if we have cached the Identifier to Name mapping - final String schemaName = schemaIdentifierToNameMap.get(schemaId); - if (schemaName != null) { - return getSchema(schemaName); - } - - final String schemaDescription = "identifier " + schemaId; - final JsonNode schemaNameArray = fetchJsonResponse("/subjects", schemaDescription); - if (!schemaNameArray.isArray()) { - throw new IOException("When determining Subjects that are available, expected a JSON Array but did not receive a valid response"); - } - final ArrayNode arrayNode = (ArrayNode) schemaNameArray; - for (final JsonNode node : arrayNode) { - final String nodeName = node.asText(); + final String schemaPath = getSchemaPath(schemaId); + final JsonNode responseJson = fetchJsonResponse(schemaPath, "id " + schemaId); + final JsonNode subjectsJson = fetchJsonResponse("/subjects", "subjects array"); + final ArrayNode subjectsList = (ArrayNode) subjectsJson; - final String schemaPath = getSubjectPath(nodeName); - final JsonNode schemaNode; + JsonNode completeSchema = null; + for (JsonNode subject: subjectsList) { try { - schemaNode = fetchJsonResponse(schemaPath, schemaDescription); - } catch (final SchemaNotFoundException | IOException e) { - logger.warn("Failed to fetch Schema with name '{}' from Confluent Schema Registry; " - + "will skip this schema and continue attempting to retrieve other schemas", new Object[] {nodeName, e}); + final String subjectName = subject.asText(); + completeSchema = postJsonResponse("/subjects/" + subjectName, responseJson, "schema id: " + schemaId); + break; + } catch (SchemaNotFoundException e) { continue; } - final int id = schemaNode.get(ID_FIELD_NAME).asInt(); - schemaNameToIdentifierMap.put(nodeName, id); - schemaIdentifierToNameMap.put(id, nodeName); + } - if (id == schemaId) { - return createRecordSchema(schemaNode); - } + if(completeSchema == null) { + throw new SchemaNotFoundException("could not get schema with id: " + schemaId); } - throw new SchemaNotFoundException("Could not find a schema with identifier " + schemaId); + final RecordSchema recordSchema = createRecordSchema(completeSchema); + return recordSchema; } private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException { @@ -159,11 +147,38 @@ private String getSubjectPath(final String schemaName) throws UnsupportedEncodin return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/latest"; } + private String getSchemaPath(final int schemaId) throws UnsupportedEncodingException { + return "/schemas/ids/" + URLEncoder.encode(String.valueOf(schemaId), "UTF-8"); + } + + private JsonNode postJsonResponse(final String pathSuffix, final JsonNode schema, final String schemaDescription) throws SchemaNotFoundException { + String errorMessage = null; + for(final String baseUrl: baseUrls) { + final String path = getPath(pathSuffix); + final String trimmedBase = getTrimmedBase(baseUrl); + final String url = trimmedBase + path; + final WebTarget builder = client.target(url); + final Response response = builder.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE).post(Entity.json(schema.toString())); + final int responseCode = response.getStatus(); + + if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) { + continue; + } + + if(responseCode == Response.Status.OK.getStatusCode()) { + final JsonNode responseJson = response.readEntity(JsonNode.class); + return responseJson; + } + } + + throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage); + } + private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException, IOException { String errorMessage = null; for (final String baseUrl : baseUrls) { - final String path = pathSuffix.startsWith("/") ? pathSuffix : "/" + pathSuffix; - final String trimmedBase = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl; + final String path = getPath(pathSuffix); + final String trimmedBase = getTrimmedBase(baseUrl); final String url = trimmedBase + path; final WebTarget webTarget = client.target(url); @@ -187,4 +202,12 @@ private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaD throw new IOException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage); } + private String getTrimmedBase(String baseUrl) { + return baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl; + } + + private String getPath(String pathSuffix) { + return pathSuffix.startsWith("/") ? pathSuffix : "/" + pathSuffix; + } + }