Skip to content

Commit

Permalink
Add metadata/rules to schema format
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Oct 12, 2022
1 parent e89951f commit ca2623f
Show file tree
Hide file tree
Showing 30 changed files with 1,511 additions and 98 deletions.
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
files="ErrorMessage.java"/>

<suppress checks="ParameterNumber"
files="(AbstractKafkaProtobufSerializer|DynamicSchema|MessageDefinition|SchemaRegistryCoordinator).java"/>
files="(AbstractKafkaProtobufSerializer|DynamicSchema|MessageDefinition|ProtobufSchema|SchemaRegistryCoordinator).java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|CachedSchemaRegistryClient|RestService|Errors|SchemaRegistryRestApplication|Context|KafkaSchemaRegistry|KafkaStore|AvroConverter|AvroData|AvroSchemaUtils|KafkaGroupLeaderElector|ProtobufSchema|ProtobufData|JsonSchemaData|InMemoryCache|SchemaMessageReader|Jackson|JsonSchemaConverter|MetricsContainer|ProtobufSchema|ProtobufSchemaUtils).java"/>
Expand All @@ -22,7 +22,7 @@
files="(Errors|AvroMessageReader).java"/>

<suppress checks="CyclomaticComplexity"
files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AvroSchema|AvroSchemaUtils|CompatibilityResource|Context|ContextKey|KafkaSchemaRegistry|KafkaStore|KafkaStoreMessageHandler|KafkaStoreReaderThread|AvroData|DownloadSchemaRegistryMojo|MockSchemaRegistryClient|SchemaRegistrySerializer|SchemaValue|SubjectVersionsResource|ProtobufSchema|SchemaDiff|FieldSchemaDiff|MessageSchemaDiff|DynamicSchema|SchemaMessageFormatter|ProtobufData|JsonSchema|JSON.*|AbstractKafkaJsonSchemaDeserializer|JsonSchemaData|JsonSchemaUtils|MessageDefinition|ProtobufSchemaUtils|SchemaMessageReader|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|SubjectKeyComparator|ContextFilter|QualifiedSubject).java"/>
files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AvroSchema|AvroSchemaUtils|CompatibilityResource|Context|ContextKey|KafkaSchemaRegistry|KafkaStore|KafkaStoreMessageHandler|KafkaStoreReaderThread|AvroData|DownloadSchemaRegistryMojo|MockSchemaRegistryClient|SchemaRegistrySerializer|SchemaValue|SubjectVersionsResource|ProtobufSchema|SchemaDiff|FieldSchemaDiff|MessageSchemaDiff|DynamicSchema|SchemaMessageFormatter|ProtobufData|JsonSchema|JSON.*|AbstractKafkaJsonSchemaDeserializer|JsonSchemaData|JsonSchemaUtils|MessageDefinition|ProtobufSchemaUtils|SchemaMessageReader|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|SubjectKeyComparator|ContextFilter|QualifiedSubject|Schema).java"/>

<suppress checks="NPathComplexity"
files="(AvroData|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.confluent.kafka.schemaregistry;

import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -62,13 +64,58 @@ default String formattedString(String format) {
throw new IllegalArgumentException("Format not supported: " + format);
}

/**
* Returns the version of the schema if set.
*
* @return the version
*/
Integer version();

/**
* Returns a list of schema references.
*
* @return the schema references
*/
List<SchemaReference> references();

/**
* Returns metadata.
*
* @return the metadata
*/
Metadata metadata();

/**
* Returns a rule set.
*
* @return the rule set
*/
RuleSet ruleSet();

/**
* Returns a copy of this schema.
*
* @return a copy of this schema
*/
ParsedSchema copy();

/**
* Returns a copy of this schema, but with the given version.
*
* @param version the version
* @return a copy of this schema, but with the given version
*/
ParsedSchema copy(Integer version);

/**
* Returns a copy of this schema, but with the given metadata and rule set.
*
* @param metadata the metadata
* @param ruleSet the rule set
* @return a copy of this schema, but with the given metadata and rule set
*/
ParsedSchema copy(Metadata metadata, RuleSet ruleSet);

/**
* Returns a normalized copy of this schema.
* Normalization generally ignores ordering when it is not significant.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

package io.confluent.kafka.schemaregistry.avro;

import static io.confluent.kafka.schemaregistry.client.rest.entities.Metadata.EMPTY_METADATA;
import static io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet.EMPTY_RULESET;

import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;

import java.util.HashMap;
Expand Down Expand Up @@ -46,6 +51,8 @@ public class AvroSchema implements ParsedSchema {
private final Integer version;
private final List<SchemaReference> references;
private final Map<String, String> resolvedReferences;
private final Metadata metadata;
private final RuleSet ruleSet;
private final boolean isNew;

private transient int hashCode = NO_HASHCODE;
Expand All @@ -68,6 +75,17 @@ public AvroSchema(String schemaString,
Map<String, String> resolvedReferences,
Integer version,
boolean isNew) {
this(schemaString, references, resolvedReferences,
EMPTY_METADATA, EMPTY_RULESET, version, isNew);
}

public AvroSchema(String schemaString,
List<SchemaReference> references,
Map<String, String> resolvedReferences,
Metadata metadata,
RuleSet ruleSet,
Integer version,
boolean isNew) {
this.isNew = isNew;
Schema.Parser parser = getParser();
for (String schema : resolvedReferences.values()) {
Expand All @@ -76,6 +94,8 @@ public AvroSchema(String schemaString,
this.schemaObj = parser.parse(schemaString);
this.references = Collections.unmodifiableList(references);
this.resolvedReferences = Collections.unmodifiableMap(resolvedReferences);
this.metadata = metadata;
this.ruleSet = ruleSet;
this.version = version;
}

Expand All @@ -88,6 +108,8 @@ public AvroSchema(Schema schemaObj, Integer version) {
this.schemaObj = schemaObj;
this.references = Collections.emptyList();
this.resolvedReferences = Collections.emptyMap();
this.metadata = EMPTY_METADATA;
this.ruleSet = EMPTY_RULESET;
this.version = version;
}

Expand All @@ -96,6 +118,8 @@ private AvroSchema(
String canonicalString,
List<SchemaReference> references,
Map<String, String> resolvedReferences,
Metadata metadata,
RuleSet ruleSet,
Integer version,
boolean isNew
) {
Expand All @@ -104,15 +128,48 @@ private AvroSchema(
this.canonicalString = canonicalString;
this.references = references;
this.resolvedReferences = resolvedReferences;
this.metadata = metadata;
this.ruleSet = ruleSet;
this.version = version;
}

@Override
public AvroSchema copy() {
return new AvroSchema(
this.schemaObj,
this.canonicalString,
this.references,
this.resolvedReferences,
this.metadata,
this.ruleSet,
this.version,
this.isNew
);
}

@Override
public AvroSchema copy(Integer version) {
return new AvroSchema(
this.schemaObj,
this.canonicalString,
this.references,
this.resolvedReferences,
this.metadata,
this.ruleSet,
version,
this.isNew
);
}

@Override
public AvroSchema copy(Metadata metadata, RuleSet ruleSet) {
return new AvroSchema(
this.schemaObj,
this.canonicalString,
this.references,
this.resolvedReferences,
metadata,
ruleSet,
this.version,
this.isNew
);
Expand Down Expand Up @@ -159,6 +216,7 @@ public String canonicalString() {
return canonicalString;
}

@Override
public Integer version() {
return version;
}
Expand All @@ -172,6 +230,16 @@ public Map<String, String> resolvedReferences() {
return resolvedReferences;
}

@Override
public Metadata metadata() {
return metadata;
}

@Override
public RuleSet ruleSet() {
return ruleSet;
}

public boolean isNew() {
return isNew;
}
Expand Down Expand Up @@ -220,6 +288,8 @@ public boolean equals(Object o) {
return Objects.equals(version, that.version)
&& Objects.equals(references, that.references)
&& Objects.equals(schemaObj, that.schemaObj)
&& Objects.equals(metadata, that.metadata)
&& Objects.equals(ruleSet, that.ruleSet)
&& metaEqual(schemaObj, that.schemaObj, new HashMap<>());
}

Expand Down Expand Up @@ -308,7 +378,7 @@ private boolean fieldMetaEqual(
@Override
public int hashCode() {
if (hashCode == NO_HASHCODE) {
hashCode = Objects.hash(schemaObj, references, version)
hashCode = Objects.hash(schemaObj, references, version, metadata, ruleSet)
+ metaHash(schemaObj, new IdentityHashMap<>());
}
return hashCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package io.confluent.kafka.schemaregistry.avro;

import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import java.util.Map;

import io.confluent.kafka.schemaregistry.AbstractSchemaProvider;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -51,6 +50,8 @@ public ParsedSchema parseSchemaOrElseThrow(Schema schema, boolean isNew) {
schema.getSchema(),
schema.getReferences(),
resolveReferences(schema.getReferences()),
schema.getMetadata(),
schema.getRuleSet(),
null,
validateDefaults && isNew
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.confluent.kafka.schemaregistry.client;

import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import java.util.Collections;
import java.util.List;
Expand All @@ -30,6 +32,8 @@ public class SchemaMetadata {
private String schemaType;
private String schema;
private List<SchemaReference> references;
private Metadata metadata = null;
private RuleSet ruleSet = null;

public SchemaMetadata(int id,
int version,
Expand Down Expand Up @@ -57,6 +61,8 @@ public SchemaMetadata(Schema schema) {
this.schemaType = schema.getSchemaType();
this.schema = schema.getSchema();
this.references = schema.getReferences();
this.metadata = schema.getMetadata();
this.ruleSet = schema.getRuleSet();
}

public int getId() {
Expand All @@ -78,4 +84,12 @@ public String getSchema() {
public List<SchemaReference> getReferences() {
return references;
}

public Metadata getMetadata() {
return this.metadata;
}

public RuleSet getRuleSet() {
return this.ruleSet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.confluent.kafka.schemaregistry.client;

import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
Expand All @@ -36,8 +38,23 @@ Optional<ParsedSchema> parseSchema(
String schemaString,
List<SchemaReference> references);

default Optional<ParsedSchema> parseSchema(
String schemaType,
String schemaString,
List<SchemaReference> references,
Metadata metadata,
RuleSet ruleSet) {
return parseSchema(schemaType, schemaString, references).map(s -> s.copy(metadata, ruleSet));
}

default Optional<ParsedSchema> parseSchema(Schema schema) {
return parseSchema(schema.getSchemaType(), schema.getSchema(), schema.getReferences());
return parseSchema(
schema.getSchemaType(),
schema.getSchema(),
schema.getReferences(),
schema.getMetadata(),
schema.getRuleSet()
);
}

/**
Expand Down

0 comments on commit ca2623f

Please sign in to comment.