Skip to content

Commit

Permalink
DGS-8542 Add support for JSON Schema Draft 2020-12 (#2781)
Browse files Browse the repository at this point in the history
* Fix findbugs

* Fix checkstyle

* Incorporate review feedback

* Fix checkstyle
  • Loading branch information
rayokota committed Oct 10, 2023
1 parent da3ba87 commit 8944db1
Show file tree
Hide file tree
Showing 21 changed files with 7,275 additions and 51 deletions.
8 changes: 4 additions & 4 deletions checkstyle/suppressions.xml
Expand Up @@ -13,7 +13,7 @@
files="(AbstractKafkaProtobufSerializer|DynamicSchema|MessageDefinition|ProtobufSchema|Rule|RuleContext|SchemaRegistryCoordinator).java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(AzureKmsClient|AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|CachedSchemaRegistryClient|MockSchemaRegistryClient|RestService|Errors|SchemaRegistryRestApplication|Context|KafkaSchemaRegistry|KafkaStore|AvroConverter|AvroData|AvroSchemaUtils|KafkaGroupLeaderElector|ProtobufSchema|ProtobufData|JsonSchemaData|InMemoryCache|SchemaMessageReader|Jackson|JsonSchemaConverter|MetricsContainer|ProtobufSchema|ProtobufSchemaUtils|MetadataEncoderService|ProtoFileElementDeserializer|DekRegistry).java"/>
files="(AzureKmsClient|AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|CachedSchemaRegistryClient|MockSchemaRegistryClient|RestService|Errors|SchemaRegistryRestApplication|Context|KafkaSchemaRegistry|KafkaStore|AvroConverter|AvroData|AvroSchemaUtils|KafkaGroupLeaderElector|JsonSchema|ProtobufSchema|ProtobufData|JsonSchemaData|InMemoryCache|SchemaMessageReader|Jackson|JsonSchemaConverter|MetricsContainer|ProtobufSchema|ProtobufSchemaUtils|MetadataEncoderService|ProtoFileElementDeserializer|DekRegistry).java"/>

<suppress checks="ClassFanOutComplexity"
files="(RestService|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|AvroData|KafkaGroupLeaderElector).java"/>
Expand All @@ -22,13 +22,13 @@
files="(Errors|AvroMessageReader).java"/>

<suppress checks="CyclomaticComplexity"
files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|AvroSchema|AvroSchemaUtils|CompatibilityResource|Config|ConfigResource|ConfigUpdateRequest|ConfigValue|Context|ContextKey|KafkaSchemaRegistry|KafkaStore|KafkaStoreMessageHandler|KafkaStoreReaderThread|AvroData|DownloadSchemaRegistryMojo|MockSchemaRegistryClient|SchemaRegistrySerializer|SchemaValue|SubjectVersionsResource|ProtobufSchema|SchemaDiff|FieldSchemaDiff|MessageSchemaDiff|DynamicSchema|SchemaMessageFormatter|ProtobufData|JsonSchema|JSON.*|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaJsonSchemaSerializer|JsonSchemaData|JsonSchemaUtils|MessageDefinition|ProtobufSchemaUtils|SchemaMessageReader|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|SubjectKeyComparator|ContextFilter|QualifiedSubject|Schema|AvroTypeDescription|CelExecutor|DataEncryptionKeyId|EncryptionKeyId|EncryptionUpdateRequestHandler|FieldEncryptionExecutor|FieldRuleExecutor|Rule|WildcardMatcher|JsonSchemaComparator|DlqAction|LocalSchemaRegistryClient|RetryExecutor).java"/>
files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|AvroSchema|AvroSchemaUtils|CompatibilityResource|Config|ConfigResource|ConfigUpdateRequest|ConfigValue|Context|ContextKey|KafkaSchemaRegistry|KafkaStore|KafkaStoreMessageHandler|KafkaStoreReaderThread|AvroData|DownloadSchemaRegistryMojo|MockSchemaRegistryClient|SchemaRegistrySerializer|SchemaValue|SubjectVersionsResource|ProtobufSchema|SchemaDiff|FieldSchemaDiff|MessageSchemaDiff|DynamicSchema|SchemaMessageFormatter|ProtobufData|JsonSchema|JSON.*|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaJsonSchemaSerializer|JsonSchemaData|JsonSchemaUtils|MessageDefinition|ProtobufSchemaUtils|SchemaMessageReader|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|SubjectKeyComparator|ContextFilter|QualifiedSubject|Schema|AvroTypeDescription|CelExecutor|DataEncryptionKeyId|EncryptionKeyId|EncryptionUpdateRequestHandler|FieldEncryptionExecutor|FieldRuleExecutor|Rule|WildcardMatcher|JsonSkemaArrayDeserializer|JsonSkemaArraySerializer|JsonSkemaObjectDeserializer|JsonSkemaObjectSerializer|JsonSchemaComparator|DlqAction|LocalSchemaRegistryClient|RetryExecutor|SchemaTranslator|SchemaUtils).java"/>

<suppress checks="NPathComplexity"
files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator|LocalSchemaRegistryClient|DataEncryptionKeyId|FieldEncryptionExecutor).java"/>
files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator|LocalSchemaRegistryClient|DataEncryptionKeyId|FieldEncryptionExecutor|SchemaTranslator|SchemaUtils).java"/>

<suppress checks="JavaNCSS"
files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AbstractKafkaSchemaSerDe|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader).java"/>
files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AbstractKafkaSchemaSerDe|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader|SchemaTranslator).java"/>

<suppress checks="MethodLength"
files="(AvroData|ProtobufSchema|ProtobufSchemaUtils).java"/>
Expand Down
5 changes: 5 additions & 0 deletions findbugs/findbugs-exclude.xml
Expand Up @@ -102,6 +102,11 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE"/>
</Match>

<Match>
<Class name="io.confluent.kafka.schemaregistry.json.schema.SchemaTranslator"/>
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
</Match>

<Match>
<Class name="~io.confluent.kafka.schemaregistry.client.security.SslFactory.*"/>
<Bug pattern="THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION"/>
Expand Down
4 changes: 4 additions & 0 deletions json-schema-provider/pom.xml
Expand Up @@ -35,6 +35,10 @@
<groupId>com.github.erosb</groupId>
<artifactId>everit-json-schema</artifactId>
</dependency>
<dependency>
<groupId>com.github.erosb</groupId>
<artifactId>json-sKema</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
Expand Down
Expand Up @@ -35,8 +35,24 @@
import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.databind.ser.BeanPropertyWriter;
import com.fasterxml.jackson.databind.ser.PropertyWriter;
import com.github.erosb.jsonsKema.IJsonValue;
import com.github.erosb.jsonsKema.JsonArray;
import com.github.erosb.jsonsKema.JsonBoolean;
import com.github.erosb.jsonsKema.JsonNull;
import com.github.erosb.jsonsKema.JsonNumber;
import com.github.erosb.jsonsKema.JsonObject;
import com.github.erosb.jsonsKema.JsonParser;
import com.github.erosb.jsonsKema.JsonString;
import com.github.erosb.jsonsKema.JsonValue;
import com.github.erosb.jsonsKema.SchemaClient;
import com.github.erosb.jsonsKema.SchemaLoaderConfig;
import com.github.erosb.jsonsKema.SchemaLoadingException;
import com.github.erosb.jsonsKema.UnknownSource;
import com.github.erosb.jsonsKema.ValidationFailure;
import com.github.erosb.jsonsKema.Validator;
import com.google.common.collect.Lists;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaEntity;
import io.confluent.kafka.schemaregistry.json.schema.SchemaTranslator;
import io.confluent.kafka.schemaregistry.rules.FieldTransform;
import io.confluent.kafka.schemaregistry.rules.RuleContext;
import io.confluent.kafka.schemaregistry.rules.RuleContext.FieldContext;
Expand All @@ -45,8 +61,16 @@
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
Expand All @@ -65,9 +89,9 @@
import org.everit.json.schema.ReferenceSchema;
import org.everit.json.schema.Schema;
import org.everit.json.schema.StringSchema;
import org.everit.json.schema.TrueSchema;
import org.everit.json.schema.ValidationException;
import org.everit.json.schema.loader.SchemaLoader;
import org.everit.json.schema.loader.SpecificationVersion;
import org.everit.json.schema.loader.internal.ReferenceResolver;
import org.json.JSONArray;
import org.json.JSONObject;
Expand Down Expand Up @@ -95,6 +119,8 @@ public class JsonSchema implements ParsedSchema {

private static final Logger log = LoggerFactory.getLogger(JsonSchema.class);

public static final String DEFAULT_BASE_URI = "mem://input";

public static final String TYPE = "JSON";

public static final String TAGS = "confluent:tags";
Expand All @@ -107,6 +133,8 @@ public class JsonSchema implements ParsedSchema {

private transient Schema schemaObj;

private transient com.github.erosb.jsonsKema.Schema skemaObj;

private final Integer version;

private final List<SchemaReference> references;
Expand Down Expand Up @@ -288,40 +316,101 @@ public Schema rawSchema() {
}
if (schemaObj == null) {
try {
// Extract the $schema to use for determining the id keyword
SpecificationVersion spec = SpecificationVersion.DRAFT_7;
if (jsonNode.has(SCHEMA_KEYWORD)) {
String schema = jsonNode.get(SCHEMA_KEYWORD).asText();
if (schema != null) {
spec = SpecificationVersion.lookupByMetaSchemaUrl(schema)
.orElse(SpecificationVersion.DRAFT_7);
if (jsonNode.isBoolean()) {
schemaObj = jsonNode.booleanValue()
? TrueSchema.builder().build()
: FalseSchema.builder().build();
} else {
// Extract the $schema to use for determining the id keyword
SpecificationVersion spec = SpecificationVersion.DRAFT_7;
if (jsonNode.has(SCHEMA_KEYWORD)) {
String schema = jsonNode.get(SCHEMA_KEYWORD).asText();
SpecificationVersion s = SpecificationVersion.getFromUrl(schema);
if (s != null) {
spec = s;
}
}
}
// Extract the $id to use for resolving relative $ref URIs
URI idUri = null;
if (jsonNode.has(spec.idKeyword())) {
String id = jsonNode.get(spec.idKeyword()).asText();
if (id != null) {
idUri = ReferenceResolver.resolve((URI) null, id);
switch (spec) {
case DRAFT_2020_12:
case DRAFT_2019_09:
loadLatestDraft();
break;
default:
loadPreviousDraft(spec);
break;
}
}
SchemaLoader.SchemaLoaderBuilder builder = SchemaLoader.builder()
.useDefaults(true).draftV7Support();
for (Map.Entry<String, String> dep : resolvedReferences.entrySet()) {
URI child = ReferenceResolver.resolve(idUri, dep.getKey());
builder.registerSchemaByURI(child, new JSONObject(dep.getValue()));
}
JSONObject jsonObject = objectMapper.treeToValue(jsonNode, JSONObject.class);
builder.schemaJson(jsonObject);
SchemaLoader loader = builder.build();
schemaObj = loader.load().build();
} catch (IOException e) {
throw new IllegalArgumentException("Invalid JSON", e);
} catch (Throwable e) {
throw new IllegalArgumentException("Invalid JSON Schema", e);
}
}
return schemaObj;
}

private void loadLatestDraft() throws URISyntaxException {
URI idUri = null;
if (jsonNode.has("$id")) {
String id = jsonNode.get("$id").asText();
if (id != null) {
idUri = ReferenceResolver.resolve((URI) null, id);
}
} else {
idUri = new URI(DEFAULT_BASE_URI);
}
Map<URI, String> references = new HashMap<>();
for (Map.Entry<String, String> dep : resolvedReferences.entrySet()) {
URI child = ReferenceResolver.resolve(idUri, dep.getKey());
references.put(child, dep.getValue());
}
SchemaLoaderConfig config = new SchemaLoaderConfig(
new ReferenceSchemaClient(references), DEFAULT_BASE_URI);

JsonValue schemaJson = objectMapper.convertValue(jsonNode, JsonObject.class);
skemaObj = new com.github.erosb.jsonsKema.SchemaLoader(schemaJson, config).load();
SchemaTranslator.SchemaContext ctx = skemaObj.accept(new SchemaTranslator());
assert ctx != null;
ctx.close();
schemaObj = ctx.schema();
}

private void loadPreviousDraft(SpecificationVersion spec)
throws JsonProcessingException {
org.everit.json.schema.loader.SpecificationVersion loaderSpec =
org.everit.json.schema.loader.SpecificationVersion.DRAFT_7;
switch (spec) {
case DRAFT_7:
loaderSpec = org.everit.json.schema.loader.SpecificationVersion.DRAFT_7;
break;
case DRAFT_6:
loaderSpec = org.everit.json.schema.loader.SpecificationVersion.DRAFT_6;
break;
case DRAFT_4:
loaderSpec = org.everit.json.schema.loader.SpecificationVersion.DRAFT_4;
break;
default:
break;
}

// Extract the $id to use for resolving relative $ref URIs
URI idUri = null;
if (jsonNode.has(loaderSpec.idKeyword())) {
String id = jsonNode.get(loaderSpec.idKeyword()).asText();
if (id != null) {
idUri = ReferenceResolver.resolve((URI) null, id);
}
}
SchemaLoader.SchemaLoaderBuilder builder = SchemaLoader.builder()
.useDefaults(true).draftV7Support();
for (Map.Entry<String, String> dep : resolvedReferences.entrySet()) {
URI child = ReferenceResolver.resolve(idUri, dep.getKey());
builder.registerSchemaByURI(child, new JSONObject(dep.getValue()));
}
JSONObject jsonObject = objectMapper.treeToValue(jsonNode, JSONObject.class);
builder.schemaJson(jsonObject);
SchemaLoader loader = builder.build();
schemaObj = loader.load().build();
}

@Override
public String schemaType() {
return TYPE;
Expand Down Expand Up @@ -420,8 +509,45 @@ public void validate(boolean strict) {
}
}

public JsonNode validate(Object value) throws JsonProcessingException, ValidationException {
return validate(rawSchema(), value);
public JsonNode validate(JsonNode value) throws JsonProcessingException, ValidationException {
if (skemaObj != null) {
return validate(skemaObj, value);
} else {
return validate(rawSchema(), value);
}
}

public static JsonNode validate(com.github.erosb.jsonsKema.Schema schema, JsonNode value)
throws JsonProcessingException, ValidationException {
Validator validator = Validator.forSchema(schema);
JsonValue primitiveValue = null;
if (value instanceof BinaryNode) {
primitiveValue = new JsonString(((BinaryNode) value).asText(), UnknownSource.INSTANCE);
} else if (value instanceof BooleanNode) {
primitiveValue = new JsonBoolean(((BooleanNode) value).asBoolean(), UnknownSource.INSTANCE);
} else if (value instanceof NullNode) {
primitiveValue = new JsonNull(UnknownSource.INSTANCE);
} else if (value instanceof NumericNode) {
primitiveValue = new JsonNumber(((NumericNode) value).numberValue(), UnknownSource.INSTANCE);
} else if (value instanceof TextNode) {
primitiveValue = new JsonString(((TextNode) value).asText(), UnknownSource.INSTANCE);
}
ValidationFailure failure = null;
if (primitiveValue != null) {
failure = validator.validate(primitiveValue);
} else {
JsonValue jsonObject;
if (value instanceof ArrayNode) {
jsonObject = objectMapper.convertValue(value, JsonArray.class);
} else {
jsonObject = objectMapper.convertValue(value, JsonObject.class);
}
failure = validator.validate(jsonObject);
}
if (failure != null) {
throw new ValidationException(failure.toString());
}
return value;
}

public static JsonNode validate(Schema schema, Object value)
Expand Down Expand Up @@ -921,4 +1047,33 @@ private void modifySchemaTags(JsonNode node,
}
}
}

public static class ReferenceSchemaClient implements SchemaClient {

private Map<URI, String> references;

public ReferenceSchemaClient(Map<URI, String> references) {
this.references = references;
}

@Override
public InputStream get(URI uri) {
String reference = references.get(uri);
if (reference == null) {
throw new UncheckedIOException(new FileNotFoundException(uri.toString()));
}
return new ByteArrayInputStream(reference.getBytes(StandardCharsets.UTF_8));
}

@Override
public IJsonValue getParsed(URI uri) {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(get(uri), StandardCharsets.UTF_8))) {
String string = reader.lines().collect(Collectors.joining());
return new JsonParser(string, uri).parse();
} catch (Exception ex) {
throw new SchemaLoadingException("failed to parse json content returned from $uri", ex);
}
}
}
}

0 comments on commit 8944db1

Please sign in to comment.