Skip to content

Commit

Permalink
NIFI-4777: get schema by id even if not latest
Browse files Browse the repository at this point in the history
This closes #2405.

Signed-off-by: Mark Payne <markap14@hotmail.com>
  • Loading branch information
Internet authored and markap14 committed Mar 19, 2018
1 parent 4687e0f commit 1091093
Showing 1 changed file with 57 additions and 33 deletions.
Expand Up @@ -33,6 +33,7 @@

import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
Expand All @@ -41,8 +42,7 @@
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;


/**
* <p>
Expand All @@ -64,9 +64,8 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
private static final String VERSION_FIELD_NAME = "version";
private static final String ID_FIELD_NAME = "id";
private static final String SCHEMA_TEXT_FIELD_NAME = "schema";

private final ConcurrentMap<String, Integer> schemaNameToIdentifierMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, String> schemaIdentifierToNameMap = new ConcurrentHashMap<>();
private static final String CONTENT_TYPE_HEADER = "Content-Type";
private static final String SCHEMA_REGISTRY_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json";


public RestSchemaRegistryClient(final List<String> baseUrls, final int timeoutMillis, final SSLContext sslContext, final ComponentLog logger) {
Expand Down Expand Up @@ -100,41 +99,30 @@ public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotF
// To make this more efficient, we will cache a mapping of Schema Name to identifier, so that we can look this up more efficiently.

// Check if we have cached the Identifier to Name mapping
final String schemaName = schemaIdentifierToNameMap.get(schemaId);
if (schemaName != null) {
return getSchema(schemaName);
}

final String schemaDescription = "identifier " + schemaId;
final JsonNode schemaNameArray = fetchJsonResponse("/subjects", schemaDescription);
if (!schemaNameArray.isArray()) {
throw new IOException("When determining Subjects that are available, expected a JSON Array but did not receive a valid response");
}

final ArrayNode arrayNode = (ArrayNode) schemaNameArray;
for (final JsonNode node : arrayNode) {
final String nodeName = node.asText();
final String schemaPath = getSchemaPath(schemaId);
final JsonNode responseJson = fetchJsonResponse(schemaPath, "id " + schemaId);
final JsonNode subjectsJson = fetchJsonResponse("/subjects", "subjects array");
final ArrayNode subjectsList = (ArrayNode) subjectsJson;

final String schemaPath = getSubjectPath(nodeName);
final JsonNode schemaNode;
JsonNode completeSchema = null;
for (JsonNode subject: subjectsList) {
try {
schemaNode = fetchJsonResponse(schemaPath, schemaDescription);
} catch (final SchemaNotFoundException | IOException e) {
logger.warn("Failed to fetch Schema with name '{}' from Confluent Schema Registry; "
+ "will skip this schema and continue attempting to retrieve other schemas", new Object[] {nodeName, e});
final String subjectName = subject.asText();
completeSchema = postJsonResponse("/subjects/" + subjectName, responseJson, "schema id: " + schemaId);
break;
} catch (SchemaNotFoundException e) {
continue;
}

final int id = schemaNode.get(ID_FIELD_NAME).asInt();
schemaNameToIdentifierMap.put(nodeName, id);
schemaIdentifierToNameMap.put(id, nodeName);
}

if (id == schemaId) {
return createRecordSchema(schemaNode);
}
if(completeSchema == null) {
throw new SchemaNotFoundException("could not get schema with id: " + schemaId);
}

throw new SchemaNotFoundException("Could not find a schema with identifier " + schemaId);
final RecordSchema recordSchema = createRecordSchema(completeSchema);
return recordSchema;
}

private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException {
Expand All @@ -159,11 +147,39 @@ private String getSubjectPath(final String schemaName) throws UnsupportedEncodin
return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/latest";
}

private String getSchemaPath(final int schemaId) throws UnsupportedEncodingException {
return "/schemas/ids/" + URLEncoder.encode(String.valueOf(schemaId), "UTF-8");
}

private JsonNode postJsonResponse(final String pathSuffix, final JsonNode schema, final String schemaDescription) throws SchemaNotFoundException {
String errorMessage = null;
for(final String baseUrl: baseUrls) {
final String path = getPath(pathSuffix);
final String trimmedBase = getTrimmedBase(baseUrl);
final String url = trimmedBase + path;
final WebTarget builder = client.target(url);
final Response response = builder.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE).post(Entity.json(schema.toString()));
final int responseCode = response.getStatus();

if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
continue;
}

if(responseCode == Response.Status.OK.getStatusCode()) {
final JsonNode responseJson = response.readEntity(JsonNode.class);
return responseJson;
}
}

throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: "
+ errorMessage);
}

private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException, IOException {
String errorMessage = null;
for (final String baseUrl : baseUrls) {
final String path = pathSuffix.startsWith("/") ? pathSuffix : "/" + pathSuffix;
final String trimmedBase = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
final String path = getPath(pathSuffix);
final String trimmedBase = getTrimmedBase(baseUrl);
final String url = trimmedBase + path;

final WebTarget webTarget = client.target(url);
Expand All @@ -187,4 +203,12 @@ private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaD
throw new IOException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage);
}

private String getTrimmedBase(String baseUrl) {
return baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
}

private String getPath(String pathSuffix) {
return pathSuffix.startsWith("/") ? pathSuffix : "/" + pathSuffix;
}

}

0 comments on commit 1091093

Please sign in to comment.