Skip to content

Commit

Permalink
NIFI-6011 support for retrieving named schema versions
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Savitsky committed Mar 1, 2019
1 parent 5249a85 commit 0602a55
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 20 deletions.
Expand Up @@ -185,7 +185,13 @@ private RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifie
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present");
}

final RecordSchema schema = client.getSchema(schemaName.get());
final RecordSchema schema;
if (schemaIdentifier.getVersion().isPresent()) {
schema = client.getSchema(schemaName.get(), schemaIdentifier.getVersion().getAsInt());
} else {
schema = client.getSchema(schemaName.get());
}

return schema;
}

Expand Down
Expand Up @@ -19,17 +19,16 @@

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.serialization.record.RecordSchema;

import java.io.IOException;
import java.time.Duration;


public class CachingSchemaRegistryClient implements SchemaRegistryClient {
private final SchemaRegistryClient client;

private final LoadingCache<String, RecordSchema> nameCache;
private final LoadingCache<Pair<String, Integer>, RecordSchema> nameVersionCache;
private final LoadingCache<Integer, RecordSchema> idCache;


Expand All @@ -40,19 +39,28 @@ public CachingSchemaRegistryClient(final SchemaRegistryClient toWrap, final int
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos))
.build(client::getSchema);
nameVersionCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos))
.build(key -> client.getSchema(key.getLeft(), key.getRight()));
idCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos))
.build(client::getSchema);
}

@Override
public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException {
public RecordSchema getSchema(final String schemaName) {
return nameCache.get(schemaName);
}

@Override
public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException {
public RecordSchema getSchema(String schemaName, int version) {
return nameVersionCache.get(Pair.of(schemaName, version));
}

@Override
public RecordSchema getSchema(final int schemaId) {
return idCache.get(schemaId);
}

Expand Down
Expand Up @@ -82,13 +82,19 @@ public RestSchemaRegistryClient(final List<String> baseUrls, final int timeoutMi

@Override
public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException {
final String pathSuffix = getSubjectPath(schemaName);
final String pathSuffix = getSubjectPath(schemaName, null);
final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " + schemaName);

final RecordSchema recordSchema = createRecordSchema(responseJson);
return recordSchema;
return createRecordSchema(responseJson);
}

@Override
public RecordSchema getSchema(final String schemaName, final int schemaVersion) throws IOException, SchemaNotFoundException {
final String pathSuffix = getSubjectPath(schemaName, schemaVersion);
final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " + schemaName);

return createRecordSchema(responseJson);
}

@Override
public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException {
Expand Down Expand Up @@ -121,8 +127,7 @@ public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotF
throw new SchemaNotFoundException("could not get schema with id: " + schemaId);
}

final RecordSchema recordSchema = createRecordSchema(completeSchema);
return recordSchema;
return createRecordSchema(completeSchema);
}

private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException {
Expand All @@ -133,18 +138,18 @@ private RecordSchema createRecordSchema(final JsonNode schemaNode) throws Schema

try {
final Schema avroSchema = new Schema.Parser().parse(schemaText);
final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id(Long.valueOf(id)).version(version).build();
final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id((long) id).version(version).build();

final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
return recordSchema;
return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
} catch (final SchemaParseException spe) {
throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject
+ " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
}
}

private String getSubjectPath(final String schemaName) throws UnsupportedEncodingException {
return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/latest";
private String getSubjectPath(final String schemaName, final Integer schemaVersion) throws UnsupportedEncodingException {
return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/" +
(schemaVersion == null ? "latest" : URLEncoder.encode(String.valueOf(schemaVersion), "UTF-8"));
}

private String getSchemaPath(final int schemaId) throws UnsupportedEncodingException {
Expand All @@ -166,8 +171,7 @@ private JsonNode postJsonResponse(final String pathSuffix, final JsonNode schema
}

if(responseCode == Response.Status.OK.getStatusCode()) {
final JsonNode responseJson = response.readEntity(JsonNode.class);
return responseJson;
return response.readEntity(JsonNode.class);
}
}

Expand All @@ -187,8 +191,7 @@ private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaD
final int responseCode = response.getStatus();

if (responseCode == Response.Status.OK.getStatusCode()) {
final JsonNode responseJson = response.readEntity(JsonNode.class);
return responseJson;
return response.readEntity(JsonNode.class);
}

if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
Expand Down
Expand Up @@ -26,5 +26,7 @@ public interface SchemaRegistryClient {

RecordSchema getSchema(String schemaName) throws IOException, SchemaNotFoundException;

RecordSchema getSchema(String schemaName, int version) throws IOException, SchemaNotFoundException;

RecordSchema getSchema(int schemaId) throws IOException, SchemaNotFoundException;
}

0 comments on commit 0602a55

Please sign in to comment.