Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-8542 Add support for JSON Schema Draft 2020-12 #2781

Merged
merged 4 commits into from Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions checkstyle/suppressions.xml
Expand Up @@ -13,7 +13,7 @@
files="(AbstractKafkaProtobufSerializer|DynamicSchema|MessageDefinition|ProtobufSchema|Rule|RuleContext|SchemaRegistryCoordinator).java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(AzureKmsClient|AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|CachedSchemaRegistryClient|MockSchemaRegistryClient|RestService|Errors|SchemaRegistryRestApplication|Context|KafkaSchemaRegistry|KafkaStore|AvroConverter|AvroData|AvroSchemaUtils|KafkaGroupLeaderElector|ProtobufSchema|ProtobufData|JsonSchemaData|InMemoryCache|SchemaMessageReader|Jackson|JsonSchemaConverter|MetricsContainer|ProtobufSchema|ProtobufSchemaUtils|MetadataEncoderService|ProtoFileElementDeserializer|DekRegistry).java"/>
files="(AzureKmsClient|AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|CachedSchemaRegistryClient|MockSchemaRegistryClient|RestService|Errors|SchemaRegistryRestApplication|Context|KafkaSchemaRegistry|KafkaStore|AvroConverter|AvroData|AvroSchemaUtils|KafkaGroupLeaderElector|JsonSchema|ProtobufSchema|ProtobufData|JsonSchemaData|InMemoryCache|SchemaMessageReader|Jackson|JsonSchemaConverter|MetricsContainer|ProtobufSchema|ProtobufSchemaUtils|MetadataEncoderService|ProtoFileElementDeserializer|DekRegistry).java"/>

<suppress checks="ClassFanOutComplexity"
files="(RestService|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|AvroData|KafkaGroupLeaderElector).java"/>
Expand All @@ -22,13 +22,13 @@
files="(Errors|AvroMessageReader).java"/>

<suppress checks="CyclomaticComplexity"
files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|AvroSchema|AvroSchemaUtils|CompatibilityResource|Config|ConfigResource|ConfigUpdateRequest|ConfigValue|Context|ContextKey|KafkaSchemaRegistry|KafkaStore|KafkaStoreMessageHandler|KafkaStoreReaderThread|AvroData|DownloadSchemaRegistryMojo|MockSchemaRegistryClient|SchemaRegistrySerializer|SchemaValue|SubjectVersionsResource|ProtobufSchema|SchemaDiff|FieldSchemaDiff|MessageSchemaDiff|DynamicSchema|SchemaMessageFormatter|ProtobufData|JsonSchema|JSON.*|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaJsonSchemaSerializer|JsonSchemaData|JsonSchemaUtils|MessageDefinition|ProtobufSchemaUtils|SchemaMessageReader|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|SubjectKeyComparator|ContextFilter|QualifiedSubject|Schema|AvroTypeDescription|CelExecutor|DataEncryptionKeyId|EncryptionKeyId|EncryptionUpdateRequestHandler|FieldEncryptionExecutor|FieldRuleExecutor|Rule|WildcardMatcher|JsonSchemaComparator|DlqAction|LocalSchemaRegistryClient|RetryExecutor).java"/>
files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|AvroSchema|AvroSchemaUtils|CompatibilityResource|Config|ConfigResource|ConfigUpdateRequest|ConfigValue|Context|ContextKey|KafkaSchemaRegistry|KafkaStore|KafkaStoreMessageHandler|KafkaStoreReaderThread|AvroData|DownloadSchemaRegistryMojo|MockSchemaRegistryClient|SchemaRegistrySerializer|SchemaValue|SubjectVersionsResource|ProtobufSchema|SchemaDiff|FieldSchemaDiff|MessageSchemaDiff|DynamicSchema|SchemaMessageFormatter|ProtobufData|JsonSchema|JSON.*|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaJsonSchemaSerializer|JsonSchemaData|JsonSchemaUtils|MessageDefinition|ProtobufSchemaUtils|SchemaMessageReader|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|SubjectKeyComparator|ContextFilter|QualifiedSubject|Schema|AvroTypeDescription|CelExecutor|DataEncryptionKeyId|EncryptionKeyId|EncryptionUpdateRequestHandler|FieldEncryptionExecutor|FieldRuleExecutor|Rule|WildcardMatcher|JsonSkemaArrayDeserializer|JsonSkemaObjectDeserializer|JsonSkemaObjectSerializer|JsonSchemaComparator|DlqAction|LocalSchemaRegistryClient|RetryExecutor|SchemaTranslator|SchemaUtils).java"/>

<suppress checks="NPathComplexity"
files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator|LocalSchemaRegistryClient|DataEncryptionKeyId|FieldEncryptionExecutor).java"/>
files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator|LocalSchemaRegistryClient|DataEncryptionKeyId|FieldEncryptionExecutor|SchemaTranslator|SchemaUtils).java"/>

<suppress checks="JavaNCSS"
files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AbstractKafkaSchemaSerDe|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader).java"/>
files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AbstractKafkaSchemaSerDe|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader|SchemaTranslator).java"/>

<suppress checks="MethodLength"
files="(AvroData|ProtobufSchema|ProtobufSchemaUtils).java"/>
Expand Down
5 changes: 5 additions & 0 deletions findbugs/findbugs-exclude.xml
Expand Up @@ -102,6 +102,11 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE"/>
</Match>

<Match>
<Class name="io.confluent.kafka.schemaregistry.json.schema.SchemaTranslator"/>
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
</Match>

<Match>
<Class name="~io.confluent.kafka.schemaregistry.client.security.SslFactory.*"/>
<Bug pattern="THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION"/>
Expand Down
4 changes: 4 additions & 0 deletions json-schema-provider/pom.xml
Expand Up @@ -35,6 +35,10 @@
<groupId>com.github.erosb</groupId>
<artifactId>everit-json-schema</artifactId>
</dependency>
<dependency>
<groupId>com.github.erosb</groupId>
<artifactId>json-sKema</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
Expand Down
Expand Up @@ -35,8 +35,24 @@
import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.databind.ser.BeanPropertyWriter;
import com.fasterxml.jackson.databind.ser.PropertyWriter;
import com.github.erosb.jsonsKema.IJsonValue;
import com.github.erosb.jsonsKema.JsonArray;
import com.github.erosb.jsonsKema.JsonBoolean;
import com.github.erosb.jsonsKema.JsonNull;
import com.github.erosb.jsonsKema.JsonNumber;
import com.github.erosb.jsonsKema.JsonObject;
import com.github.erosb.jsonsKema.JsonParser;
import com.github.erosb.jsonsKema.JsonString;
import com.github.erosb.jsonsKema.JsonValue;
import com.github.erosb.jsonsKema.SchemaClient;
import com.github.erosb.jsonsKema.SchemaLoaderConfig;
import com.github.erosb.jsonsKema.SchemaLoadingException;
import com.github.erosb.jsonsKema.UnknownSource;
import com.github.erosb.jsonsKema.ValidationFailure;
import com.github.erosb.jsonsKema.Validator;
import com.google.common.collect.Lists;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaEntity;
import io.confluent.kafka.schemaregistry.json.schema.SchemaTranslator;
import io.confluent.kafka.schemaregistry.rules.FieldTransform;
import io.confluent.kafka.schemaregistry.rules.RuleContext;
import io.confluent.kafka.schemaregistry.rules.RuleContext.FieldContext;
Expand All @@ -45,8 +61,16 @@
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
Expand All @@ -65,9 +89,9 @@
import org.everit.json.schema.ReferenceSchema;
import org.everit.json.schema.Schema;
import org.everit.json.schema.StringSchema;
import org.everit.json.schema.TrueSchema;
import org.everit.json.schema.ValidationException;
import org.everit.json.schema.loader.SchemaLoader;
import org.everit.json.schema.loader.SpecificationVersion;
import org.everit.json.schema.loader.internal.ReferenceResolver;
import org.json.JSONArray;
import org.json.JSONObject;
Expand Down Expand Up @@ -95,6 +119,8 @@ public class JsonSchema implements ParsedSchema {

private static final Logger log = LoggerFactory.getLogger(JsonSchema.class);

public static final String DEFAULT_BASE_URI = "mem://input";

public static final String TYPE = "JSON";

public static final String TAGS = "confluent:tags";
Expand All @@ -107,6 +133,8 @@ public class JsonSchema implements ParsedSchema {

private transient Schema schemaObj;

private transient com.github.erosb.jsonsKema.Schema skemaObj;

private final Integer version;

private final List<SchemaReference> references;
Expand Down Expand Up @@ -288,40 +316,101 @@ public Schema rawSchema() {
}
if (schemaObj == null) {
try {
// Extract the $schema to use for determining the id keyword
SpecificationVersion spec = SpecificationVersion.DRAFT_7;
if (jsonNode.has(SCHEMA_KEYWORD)) {
String schema = jsonNode.get(SCHEMA_KEYWORD).asText();
if (schema != null) {
spec = SpecificationVersion.lookupByMetaSchemaUrl(schema)
.orElse(SpecificationVersion.DRAFT_7);
if (jsonNode.isBoolean()) {
schemaObj = jsonNode.booleanValue()
? TrueSchema.builder().build()
: FalseSchema.builder().build();
} else {
// Extract the $schema to use for determining the id keyword
SpecificationVersion spec = SpecificationVersion.DRAFT_7;
if (jsonNode.has(SCHEMA_KEYWORD)) {
String schema = jsonNode.get(SCHEMA_KEYWORD).asText();
SpecificationVersion s = SpecificationVersion.getFromUrl(schema);
if (s != null) {
spec = s;
}
}
}
// Extract the $id to use for resolving relative $ref URIs
URI idUri = null;
if (jsonNode.has(spec.idKeyword())) {
String id = jsonNode.get(spec.idKeyword()).asText();
if (id != null) {
idUri = ReferenceResolver.resolve((URI) null, id);
switch (spec) {
case DRAFT_2020_12:
case DRAFT_2019_09:
loadLatestDraft();
break;
default:
loadPreviousDraft(spec);
break;
}
}
SchemaLoader.SchemaLoaderBuilder builder = SchemaLoader.builder()
.useDefaults(true).draftV7Support();
for (Map.Entry<String, String> dep : resolvedReferences.entrySet()) {
URI child = ReferenceResolver.resolve(idUri, dep.getKey());
builder.registerSchemaByURI(child, new JSONObject(dep.getValue()));
}
JSONObject jsonObject = objectMapper.treeToValue(jsonNode, JSONObject.class);
builder.schemaJson(jsonObject);
SchemaLoader loader = builder.build();
schemaObj = loader.load().build();
} catch (IOException e) {
throw new IllegalArgumentException("Invalid JSON", e);
} catch (Throwable e) {
throw new IllegalArgumentException("Invalid JSON Schema", e);
}
}
return schemaObj;
}

private void loadLatestDraft() throws URISyntaxException {
URI idUri = null;
if (jsonNode.has("$id")) {
String id = jsonNode.get("$id").asText();
if (id != null) {
idUri = ReferenceResolver.resolve((URI) null, id);
}
} else {
idUri = new URI(DEFAULT_BASE_URI);
}
Map<URI, String> references = new HashMap<>();
for (Map.Entry<String, String> dep : resolvedReferences.entrySet()) {
URI child = ReferenceResolver.resolve(idUri, dep.getKey());
references.put(child, dep.getValue());
}
SchemaLoaderConfig config = new SchemaLoaderConfig(
new ReferenceSchemaClient(references), DEFAULT_BASE_URI);

JsonValue schemaJson = objectMapper.convertValue(jsonNode, JsonObject.class);
skemaObj = new com.github.erosb.jsonsKema.SchemaLoader(schemaJson, config).load();
SchemaTranslator.SchemaContext ctx = skemaObj.accept(new SchemaTranslator());
assert ctx != null;
ctx.close();
schemaObj = ctx.schema();
}

private void loadPreviousDraft(SpecificationVersion spec)
throws JsonProcessingException {
org.everit.json.schema.loader.SpecificationVersion loaderSpec =
org.everit.json.schema.loader.SpecificationVersion.DRAFT_7;
switch (spec) {
case DRAFT_7:
loaderSpec = org.everit.json.schema.loader.SpecificationVersion.DRAFT_7;
break;
case DRAFT_6:
loaderSpec = org.everit.json.schema.loader.SpecificationVersion.DRAFT_6;
break;
case DRAFT_4:
loaderSpec = org.everit.json.schema.loader.SpecificationVersion.DRAFT_4;
break;
default:
break;
}

// Extract the $id to use for resolving relative $ref URIs
URI idUri = null;
if (jsonNode.has(loaderSpec.idKeyword())) {
String id = jsonNode.get(loaderSpec.idKeyword()).asText();
if (id != null) {
idUri = ReferenceResolver.resolve((URI) null, id);
}
}
SchemaLoader.SchemaLoaderBuilder builder = SchemaLoader.builder()
.useDefaults(true).draftV7Support();
for (Map.Entry<String, String> dep : resolvedReferences.entrySet()) {
URI child = ReferenceResolver.resolve(idUri, dep.getKey());
builder.registerSchemaByURI(child, new JSONObject(dep.getValue()));
}
JSONObject jsonObject = objectMapper.treeToValue(jsonNode, JSONObject.class);
builder.schemaJson(jsonObject);
SchemaLoader loader = builder.build();
schemaObj = loader.load().build();
}

@Override
public String schemaType() {
return TYPE;
Expand Down Expand Up @@ -420,8 +509,45 @@ public void validate(boolean strict) {
}
}

public JsonNode validate(Object value) throws JsonProcessingException, ValidationException {
return validate(rawSchema(), value);
public JsonNode validate(JsonNode value) throws JsonProcessingException, ValidationException {
if (skemaObj != null) {
return validate(skemaObj, value);
} else {
return validate(rawSchema(), value);
}
}

public static JsonNode validate(com.github.erosb.jsonsKema.Schema schema, JsonNode value)
throws JsonProcessingException, ValidationException {
Validator validator = Validator.forSchema(schema);
JsonValue primitiveValue = null;
if (value instanceof BinaryNode) {
primitiveValue = new JsonString(((BinaryNode) value).asText(), UnknownSource.INSTANCE);
} else if (value instanceof BooleanNode) {
primitiveValue = new JsonBoolean(((BooleanNode) value).asBoolean(), UnknownSource.INSTANCE);
} else if (value instanceof NullNode) {
primitiveValue = new JsonNull(UnknownSource.INSTANCE);
} else if (value instanceof NumericNode) {
primitiveValue = new JsonNumber(((NumericNode) value).numberValue(), UnknownSource.INSTANCE);
} else if (value instanceof TextNode) {
primitiveValue = new JsonString(((TextNode) value).asText(), UnknownSource.INSTANCE);
}
ValidationFailure failure = null;
if (primitiveValue != null) {
failure = validator.validate(primitiveValue);
} else {
JsonValue jsonObject;
if (value instanceof ArrayNode) {
jsonObject = objectMapper.convertValue(value, JsonArray.class);
} else {
jsonObject = objectMapper.convertValue(value, JsonObject.class);
}
failure = validator.validate(jsonObject);
}
if (failure != null) {
throw new ValidationException(failure.toString());
}
return value;
}

public static JsonNode validate(Schema schema, Object value)
Expand Down Expand Up @@ -921,4 +1047,33 @@ private void modifySchemaTags(JsonNode node,
}
}
}

public static class ReferenceSchemaClient implements SchemaClient {

private Map<URI, String> references;

public ReferenceSchemaClient(Map<URI, String> references) {
this.references = references;
}

@Override
public InputStream get(URI uri) {
String reference = references.get(uri);
if (reference == null) {
throw new UncheckedIOException(new FileNotFoundException(uri.toString()));
}
return new ByteArrayInputStream(reference.getBytes(StandardCharsets.UTF_8));
}

@Override
public IJsonValue getParsed(URI uri) {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(get(uri), StandardCharsets.UTF_8))) {
String string = reader.lines().collect(Collectors.joining());
return new JsonParser(string, uri).parse();
} catch (Exception ex) {
throw new SchemaLoadingException("failed to parse json content returned from $uri", ex);
}
}
}
}