Skip to content

Commit

Permalink
[schema] Support compatibility check over all existing schemas. (#4214)
Browse files Browse the repository at this point in the history
Fixes #4170 

### Motivation

Currently the schema compatibility check is only done with the latest schema. That's probably not enough especially we now support schema versions for Struct schema. We need to the ability for check the compatibility with all existing schema version.

### Modifications

Add `FORWARD_TRANSITIVE`, `BACKWARD_TRANSITIVE` and `FULL_TRANSITIVE` compatibility strategy.

### Verifying this change

This change is already covered by existing tests, such as SchemaServiceTest, BaseAvroSchemaCompatibilityTest
  • Loading branch information
codelipenghui authored and sijie committed May 20, 2019
1 parent 0876106 commit 66fc48e
Show file tree
Hide file tree
Showing 13 changed files with 315 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
.getSchemaRegistryService()
.isCompatibleWithLatestVersion(id, schema, schemaCompatibilityStrategy);
.isCompatible(id, schema, schemaCompatibilityStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1942,7 +1942,7 @@ public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
.getSchemaRegistryService()
.isCompatibleWithLatestVersion(id, schema, schemaCompatibilityStrategy);
.isCompatible(id, schema, schemaCompatibilityStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;

import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.avro.SchemaValidationException;
Expand All @@ -34,45 +36,49 @@
*/
@Slf4j
abstract class AvroSchemaBasedCompatibilityCheck implements SchemaCompatibilityCheck {

@Override
public boolean isWellFormed(SchemaData to) {
try {
Schema.Parser toParser = new Schema.Parser();
Schema toSchema = toParser.parse(new String(to.getData(), UTF_8));
} catch (SchemaParseException e) {
log.error("Error during schema parsing: {}", e.getMessage(), e);
return false;
}
return true;
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
return isCompatible(Collections.singletonList(from), to, strategy);
}

@Override
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
public boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) {
LinkedList<Schema> fromList = new LinkedList<>();
try {
Schema.Parser fromParser = new Schema.Parser();
Schema fromSchema = fromParser.parse(new String(from.getData(), UTF_8));
Schema.Parser toParser = new Schema.Parser();
Schema toSchema = toParser.parse(new String(to.getData(), UTF_8));

SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
} catch (SchemaParseException | SchemaValidationException e) {
for (SchemaData schemaData : from) {
Schema.Parser parser = new Schema.Parser();
fromList.addFirst(parser.parse(new String(schemaData.getData(), UTF_8)));
}
Schema.Parser parser = new Schema.Parser();
Schema toSchema = parser.parse(new String(to.getData(), UTF_8));
SchemaValidator schemaValidator = createSchemaValidator(strategy);
schemaValidator.validate(toSchema, fromList);
} catch (SchemaParseException e) {
log.error("Error during schema parsing: {}", e.getMessage(), e);
return false;
} catch (SchemaValidationException e) {
log.error("Error during schema compatibility check: {}", e.getMessage(), e);
return false;
}
return true;
}

static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy,
boolean onlyLatestValidator) {
static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy) {
final SchemaValidatorBuilder validatorBuilder = new SchemaValidatorBuilder();
switch (compatibilityStrategy) {
case BACKWARD_TRANSITIVE:
return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), false);
case BACKWARD:
return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator);
return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), true);
case FORWARD_TRANSITIVE:
return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), false);
case FORWARD:
return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), true);
case FULL_TRANSITIVE:
return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), false);
case FULL:
return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), true);
case ALWAYS_COMPATIBLE:
return AlwaysSchemaValidator.INSTANCE;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -38,6 +39,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -118,6 +120,33 @@ public CompletableFuture<StoredSchema> get(String key, SchemaVersion version) {
}
}

@Override
public CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key) {
CompletableFuture<List<CompletableFuture<StoredSchema>>> result = new CompletableFuture<>();
getSchemaLocator(getSchemaPath(key)).thenAccept(locator -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Get all schemas - locator: {}", key, locator);
}

if (!locator.isPresent()) {
result.complete(Collections.emptyList());
}

SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator;
List<CompletableFuture<StoredSchema>> list = new ArrayList<>();
schemaLocator.getIndexList().forEach(indexEntry -> list.add(readSchemaEntry(indexEntry.getPosition())
.thenApply(entry -> new StoredSchema
(
entry.getSchemaData().toByteArray(),
new LongSchemaVersion(schemaLocator.getInfo().getVersion())
)
)
));
result.complete(list);
});
return result;
}

@Override
public CompletableFuture<SchemaVersion> delete(String key) {
return deleteSchema(key).thenApply(LongSchemaVersion::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static java.util.concurrent.CompletableFuture.completedFuture;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaVersion;
Expand All @@ -35,6 +37,12 @@ public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVer
return completedFuture(null);
}

@Override
public CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> getAllSchemas(String schemaId) {
return completedFuture(Collections.emptyList());
}


@Override
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
Expand All @@ -47,14 +55,13 @@ public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String use
}

@Override
public SchemaVersion versionFromBytes(byte[] version) {
return null;
public CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy) {
return completedFuture(false);
}

@Override
public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
return CompletableFuture.completedFuture(true);
public SchemaVersion versionFromBytes(byte[] version) {
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,51 @@
import org.apache.pulsar.common.schema.SchemaType;

public interface SchemaCompatibilityCheck {

SchemaType getSchemaType();

/**
*
* @param from the current schema i.e. schema that the broker has
* @param to the future schema i.e. the schema sent by the producer
* @return whether the schemas are well-formed
* @param strategy the strategy to use when comparing schemas
* @return whether the schemas are compatible
*/
boolean isWellFormed(SchemaData to);
boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy);

/**
*
* @param from the current schema i.e. schema that the broker has
* @param from the current schemas i.e. schemas that the broker has
* @param to the future schema i.e. the schema sent by the producer
* @param strategy the strategy to use when comparing schemas
* @return whether the schemas are compatible
*/
boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy);
boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy);

SchemaCompatibilityCheck DEFAULT = new SchemaCompatibilityCheck() {

@Override
public SchemaType getSchemaType() {
return SchemaType.NONE;
}

@Override
public boolean isWellFormed(SchemaData to) {
return true;
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
if (strategy == SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE) {
return false;
} else {
return true;
}
}

@Override
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
public boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) {
if (strategy == SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE) {
return false;
} else {
return true;
}
}

};
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,49 @@ public enum SchemaCompatibilityStrategy {
/**
* Equivalent to both FORWARD and BACKWARD
*/
FULL;
FULL,

/**
* Be similar to BACKWARD, BACKWARD_TRANSITIVE ensure all previous version schema can
* be read by the new schema.
*/
BACKWARD_TRANSITIVE,

/**
* Be similar to FORWARD, FORWARD_TRANSITIVE ensure new schema can be ready by all previous
* version schema.
*/
FORWARD_TRANSITIVE,

/**
* Equivalent to both FORWARD_TRANSITIVE and BACKWARD_TRANSITIVE
*/
FULL_TRANSITIVE;



public static SchemaCompatibilityStrategy fromAutoUpdatePolicy(SchemaAutoUpdateCompatibilityStrategy strategy) {
if (strategy == null) {
return SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE;
}
switch (strategy) {
case Backward:
return BACKWARD;
case Forward:
return FORWARD;
case Full:
return FULL;
case AlwaysCompatible:
return ALWAYS_COMPATIBLE;
case AutoUpdateDisabled:
default:
return ALWAYS_INCOMPATIBLE;
case Backward:
return BACKWARD;
case Forward:
return FORWARD;
case Full:
return FULL;
case AlwaysCompatible:
return ALWAYS_COMPATIBLE;
case ForwardTransitive:
return FORWARD_TRANSITIVE;
case BackwardTransitive:
return BACKWARD_TRANSITIVE;
case FullTransitive:
return FULL_TRANSITIVE;
case AutoUpdateDisabled:
default:
return ALWAYS_INCOMPATIBLE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.service.schema;

import com.google.common.base.MoreObjects;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.schema.SchemaData;
Expand All @@ -30,12 +32,14 @@ public interface SchemaRegistry extends AutoCloseable {

CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVersion version);

CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> getAllSchemas(String schemaId);

CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy);

CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user);

CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema,
CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy);

SchemaVersion versionFromBytes(byte[] version);
Expand Down
Loading

0 comments on commit 66fc48e

Please sign in to comment.