Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Update the AvroCoder so that it can return true for isDeterministic.
Browse files Browse the repository at this point in the history
This assumes that AvroCoder uses directBinaryEncoder so Arrays and Maps
will be encoded as (number of elements) followed by each elements. They
are therefore deterministic if the underlying Array/Collecion/Map is ordered.

This deals with several AvroAnnotations:
@Stringable: Not deterministic because we know nothing about #toString()
@AvroSchema: We aren't sure enough about this to be confident it will lead
to deterministic encodings.
@AvroEncode: Not deterministic because we know nothing about the encoder.
@AvroName: Recognizes that this causes a different field name to be used.

This doesn't address the case of GenericRecord or SpecificRecord from a given
schema.

[]
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=87075588
  • Loading branch information
bchambers authored and davorbonaci committed Feb 24, 2015
1 parent 7df9cbc commit a806df8
Show file tree
Hide file tree
Showing 3 changed files with 679 additions and 16 deletions.
294 changes: 292 additions & 2 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AvroCoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import static com.google.cloud.dataflow.sdk.util.Structs.addString;

import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.common.reflect.TypeToken;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -27,21 +29,35 @@
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.AvroEncode;
import org.apache.avro.reflect.AvroName;
import org.apache.avro.reflect.AvroSchema;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.reflect.Union;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;

/**
* An encoder using Avro binary format.
Expand Down Expand Up @@ -78,6 +94,11 @@
* ...
* }
* </code></pre>
* <p>
* The implementation attempts to determine if the Avro encoding of the given type will satisfy
* the criteria of {@link Coder#verifyDeterministic} by inspecting both the type and the
* Schema provided or generated by Avro. Only coders that are deterministic can be used in
* {@link GroupByKey} operations.
*
* @param <T> the type of elements handled by this coder
*/
Expand Down Expand Up @@ -124,6 +145,9 @@ public static AvroCoder<?> of(

private final Class<T> type;
private final Schema schema;

private final List<String> nonDeterministicReasons;

private final DatumWriter<T> writer;
private final DatumReader<T> reader;
private final EncoderFactory encoderFactory = new EncoderFactory();
Expand All @@ -132,6 +156,13 @@ public static AvroCoder<?> of(
protected AvroCoder(Class<T> type, Schema schema) {
this.type = type;
this.schema = schema;

if (GenericRecord.class.isAssignableFrom(type)) {
nonDeterministicReasons = Arrays.asList(
"GenericRecord may have non-deterministic fields.");
} else {
nonDeterministicReasons = new AvroDeterminismChecker().check(TypeToken.of(type), schema);
}
this.reader = createDatumReader();
this.writer = createDatumWriter();
}
Expand Down Expand Up @@ -170,11 +201,26 @@ public CloudObject asCloudObject() {
}

/**
* Depends upon the structure being serialized.
* Returns true if the given type should be deterministically encoded using
* the given Schema, the directBinaryEncoder, and the ReflectDatumWriter or
* GenericDatumWriter.
*/
@Override
@Deprecated
public boolean isDeterministic() {
return false;
return nonDeterministicReasons.isEmpty();
}

/**
* Raises an exception describing reasons why the type may not be deterministically
* encoded using the given Schema, the directBinaryEncoder, and the ReflectDatumWriter
* or GenericDatumWriter.
*/
@Override
public void verifyDeterministic() throws NonDeterministicException {
if (!nonDeterministicReasons.isEmpty()) {
throw new NonDeterministicException(this, nonDeterministicReasons);
}
}

/**
Expand Down Expand Up @@ -228,4 +274,248 @@ private Object readResolve() {
return new AvroCoder<T>(type, parser.parse(schemaStr));
}
}

/**
* Helper class encapsulating the various pieces of state maintained by the
* recursive walk used for checking if the encoding will be deterministic.
*/
protected static class AvroDeterminismChecker {

// Reasons that the original type are not deterministic. This accumulates
// the actual output.
private List<String> reasons = new ArrayList<>();

// Types that are currently "open". Used to make sure we don't have any
// recursive types. Note that we assume that all occurrences of a given type
// are equal, rather than tracking pairs of type + schema.
private Set<TypeToken<?>> activeTypes = new HashSet<>();

/**
* Report an error in the current context.
*/
private void reportError(String context, String fmt, Object... args) {
String message = String.format(fmt, args);
reasons.add(context + ": " + message);
}

/**
* Classes that are serialized by Avro using their toString() are only deterministic
* if their associated #toString() method is deterministic. Rather than determine all
* of them, we conservatively list some classes that definitely are, and treat any
* others an non-deterministic.
*/
private static final Set<Class<?>> DETERMINISTIC_STRINGABLE_CLASSES = new HashSet<>();
static {
DETERMINISTIC_STRINGABLE_CLASSES.add(java.math.BigDecimal.class);
DETERMINISTIC_STRINGABLE_CLASSES.add(java.math.BigInteger.class);
DETERMINISTIC_STRINGABLE_CLASSES.add(java.net.URI.class);
DETERMINISTIC_STRINGABLE_CLASSES.add(java.net.URL.class);
DETERMINISTIC_STRINGABLE_CLASSES.add(String.class);
}

/**
* Return true if the given type token is a subtype of *any* of the listed parents.
*/
private static boolean isSubtypeOf(TypeToken<?> type, Class<?>... parents) {
for (Class<?> parent : parents) {
if (TypeToken.of(parent).isAssignableFrom(type)) {
return true;
}
}
return false;
}

protected AvroDeterminismChecker() {}

// The entry point for the check. Should not be recursively called.
public List<String> check(TypeToken<?> type, Schema schema) {
recurse(type.getRawType().getName(), type, schema);
return reasons;
}

// This is the method that should be recursively called. It sets up the path
// and visited types correctly.
private void recurse(String context, TypeToken<?> type, Schema schema) {
if (type.getRawType().isAnnotationPresent(AvroSchema.class)) {
reportError(context, "Custom schemas are not supported -- remove @AvroSchema.");
return;
}

if (!activeTypes.add(type)) {
reportError(context, "%s appears recursively", type);
return;
}

doCheck(context, type, schema);
activeTypes.remove(type);
}

private void doCheck(String context, TypeToken<?> type, Schema schema) {
switch (schema.getType()) {
case ARRAY:
checkArray(context, type, schema);
break;
case ENUM:
// Enums should be deterministic, since they depend only on the ordinal.
break;
case FIXED:
// Depending on the implementation of GenericFixed, we don't know how
// the given field will be encoded. So, we assume that it isn't
// deterministic.
reportError(context, "FIXED encodings are not guaranteed to be deterministic");
break;
case MAP:
checkMap(context, type, schema);
break;
case RECORD:
checkRecord(context, type, schema);
break;
case UNION:
checkUnion(context, type, schema);
break;
case STRING:
checkString(context, type);
break;
case BOOLEAN:
case BYTES:
case DOUBLE:
case INT:
case FLOAT:
case LONG:
case NULL:
// For types that Avro encodes using one of the above primitives, we assume they are
// deterministic.
break;
default:
// In any other case (eg., new types added to Avro) we cautiously return
// false.
reportError(context, "Unknown Avro Schema Type: %s", schema.getType());
break;
}
}

private void checkString(String context, TypeToken<?> type) {
// For types that are encoded as strings, we need to make sure they're in an approved
// whitelist. For other types that are annotated @Stringable, Avro will just use the
// #toString() methods, which has no guarantees of determinism.
if (!DETERMINISTIC_STRINGABLE_CLASSES.contains(type.getRawType())) {
reportError(context, "%s may not have deterministic #toString()", type);
}
}

private void checkUnion(String context, TypeToken<?> type, Schema schema) {
if (!type.getRawType().isAnnotationPresent(Union.class)) {
reportError(context, "Expected type %s to have @Union annotation", type);
return;
}

// Errors associated with this union will use the base class as their context.
String baseClassContext = type.getRawType().getName();

// For a union, we need to make sure that each possible instantiation is deterministic.
for (Schema concrete : schema.getTypes()) {
@SuppressWarnings("unchecked")
TypeToken<?> unionType = TypeToken.of(ReflectData.get().getClass(concrete));

recurse(baseClassContext, unionType, concrete);
}
}

private void checkRecord(String context, TypeToken<?> type, Schema schema) {
// If the the record isn't a true class, but rather a GenericRecord, SpecificRecord, etc.
// with a specificified schema, then we need to make the decision based on the generated
// implementations.
if (isSubtypeOf(type, IndexedRecord.class)) {
// TODO: Update this once we support deterministic GenericRecord/SpecificRecords.
reportError(context, "IndexedRecords may be non-deterministic");
return;
}

// For a record, we want to make sure that all the fields are deterministic.
Class<?> clazz = type.getRawType();
for (org.apache.avro.Schema.Field fieldSchema : schema.getFields()) {
Field field = getField(clazz, fieldSchema.name());
String fieldContext = field.getDeclaringClass().getName() + "#" + field.getName();

if (field.isAnnotationPresent(AvroEncode.class)) {
reportError(fieldContext,
"Custom encoders may be non-deterministic -- remove @AvroEncode");
continue;
}

if (field.isAnnotationPresent(AvroSchema.class)) {
reportError(fieldContext, "Custom schemas are not supported -- remove @AvroSchema");
continue;
}

TypeToken<?> fieldType = type.resolveType(field.getGenericType());
recurse(fieldContext, fieldType, fieldSchema.schema());
}
}

private void checkMap(String context, TypeToken<?> type, Schema schema) {
if (!isSubtypeOf(type, SortedMap.class)) {
reportError(context, "%s may not be deterministically ordered", type);
}

// Avro (currently) asserts that all keys are strings.
// In case that changes, we double check that the key was a string:
Class<?> keyType = type.resolveType(Map.class.getTypeParameters()[0]).getRawType();
if (!String.class.equals(keyType)) {
reportError(context, "map keys should be Strings, but was %s", keyType);
}

recurse(context,
type.resolveType(Map.class.getTypeParameters()[1]),
schema.getValueType());
}

private void checkArray(String context, TypeToken<?> type, Schema schema) {
TypeToken<?> elementType = null;
if (type.isArray()) {
// The type is an array (with ordering)-> deterministic iff the element is deterministic.
elementType = type.getComponentType();
} else if (isSubtypeOf(type, Collection.class)) {
if (isSubtypeOf(type, List.class, SortedSet.class)) {
// Ordered collection -> deterministic iff the element is deterministic
elementType = type.resolveType(Collection.class.getTypeParameters()[0]);
} else {
// Not an ordered collection -> not deterministic
reportError(context, "%s may not be deterministically ordered", type);
return;
}
} else {
// If it was an unknown type encoded as an array, be conservative and assume
// that we don't know anything about the order.
reportError(context, "encoding %s as an ARRAY was unexpected");
return;
}

// If we get here, it's either a deterministically-ordered Collection, or
// an array. Either way, the type is deterministic iff the element type is
// deterministic.
recurse(context, elementType, schema.getElementType());
}

/**
* Extract a field from a class. We need to look at the declared fields so that we can
* see private fields. We may need to walk up to the parent to get classes from the parent.
*/
private static Field getField(Class<?> clazz, String name) {
while (clazz != null) {
for (Field field : clazz.getDeclaredFields()) {
AvroName avroName = field.getAnnotation(AvroName.class);
if (avroName != null && name.equals(avroName.value())) {
return field;
} else if (avroName == null && name.equals(field.getName())) {
return field;
}
}
clazz = clazz.getSuperclass();
}

throw new IllegalArgumentException(
"Unable to get field " + name + " from class " + clazz);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.Collections;
import java.util.List;

import javax.annotation.Nullable;

/**
* A StandardCoder is one that defines equality, hashing, and printing
* via the class name and recursively using {@link #getComponents}.
Expand Down Expand Up @@ -148,8 +146,7 @@ public void registerByteSizeObserver(
@Override
public void verifyDeterministic() throws NonDeterministicException {
if (!isDeterministic()) {
throw new NonDeterministicException(this,
getClass().getSimpleName() + " reported it was not determinsitic.");
throw new NonDeterministicException(this, "Coder reported it was not determinsitic.");
}
}

Expand All @@ -168,13 +165,4 @@ protected void verifyDeterministic(String message, Coder<?>... coders)
throws NonDeterministicException {
verifyDeterministic(message, Arrays.asList(coders));
}

protected void addReasons(String prefix, List<String> accumulator,
@Nullable List<String> newReasons) {
if (newReasons != null) {
for (String reason : newReasons) {
accumulator.add(prefix + reason);
}
}
}
}

0 comments on commit a806df8

Please sign in to comment.