Skip to content

Commit

Permalink
[BEAM-7829] Avoid name collisions in generated avro
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanSkraba committed Aug 28, 2019
1 parent 65b79fb commit 0871831
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Days;
Expand Down Expand Up @@ -161,8 +162,8 @@ public FieldType toBeamType() {
}

/** Convert to an AVRO type. */
public org.apache.avro.Schema toAvroType() {
return org.apache.avro.Schema.createFixed(null, "", "", size);
public org.apache.avro.Schema toAvroType(String name, String namespace) {
return org.apache.avro.Schema.createFixed(name, null, namespace, size);
}
}

Expand All @@ -174,12 +175,11 @@ public static Schema.Field toBeamField(org.apache.avro.Schema.Field field) {
}

/** Get Avro Field from Beam Field. */
public static org.apache.avro.Schema.Field toAvroField(Schema.Field field) {
org.apache.avro.Schema fieldSchema = getFieldSchema(field.getName(), field.getType());
org.apache.avro.Schema.Field avroField =
new org.apache.avro.Schema.Field(
field.getName(), fieldSchema, field.getDescription(), (Object) null);
return avroField;
public static org.apache.avro.Schema.Field toAvroField(Schema.Field field, String namespace) {
org.apache.avro.Schema fieldSchema =
getFieldSchema(field.getType(), field.getName(), namespace);
return new org.apache.avro.Schema.Field(
field.getName(), fieldSchema, field.getDescription(), (Object) null);
}

private AvroUtils() {}
Expand All @@ -204,18 +204,22 @@ public static Schema toBeamSchema(org.apache.avro.Schema schema) {
}

/** Converts a Beam Schema into an AVRO schema. */
private static org.apache.avro.Schema toAvroSchema(@Nullable String name, Schema beamSchema) {
final String schemaName = (name == null) ? "topLevelRecord" : name;
private static org.apache.avro.Schema toAvroSchema(
Schema beamSchema, @Nullable String name, @Nullable String namespace) {
final String schemaName = Strings.isNullOrEmpty(name) ? "topLevelRecord" : name;
final String schemaNamespace = namespace == null ? "" : namespace;
String childNamespace =
!"".equals(schemaNamespace) ? schemaNamespace + "." + schemaName : schemaName;
List<org.apache.avro.Schema.Field> fields = Lists.newArrayList();
for (Schema.Field field : beamSchema.getFields()) {
org.apache.avro.Schema.Field recordField = toAvroField(field);
org.apache.avro.Schema.Field recordField = toAvroField(field, childNamespace);
fields.add(recordField);
}
return org.apache.avro.Schema.createRecord(schemaName, null, null, false, fields);
return org.apache.avro.Schema.createRecord(schemaName, null, schemaNamespace, false, fields);
}

public static org.apache.avro.Schema toAvroSchema(Schema beamSchema) {
return toAvroSchema(null, beamSchema);
return toAvroSchema(beamSchema, null, null);
}

/**
Expand Down Expand Up @@ -543,7 +547,7 @@ private static Schema.FieldType toFieldType(TypeWithNullability type) {
}

private static org.apache.avro.Schema getFieldSchema(
String fieldName, Schema.FieldType fieldType) {
Schema.FieldType fieldType, String fieldName, String namespace) {
org.apache.avro.Schema baseType;
switch (fieldType.getTypeName()) {
case BYTE:
Expand Down Expand Up @@ -592,7 +596,7 @@ private static org.apache.avro.Schema getFieldSchema(
case LOGICAL_TYPE:
FixedBytesField fixedBytesField = FixedBytesField.fromBeamFieldType(fieldType);
if (fixedBytesField != null) {
baseType = fixedBytesField.toAvroType();
baseType = fixedBytesField.toAvroType("fixed", namespace + "." + fieldName);
} else {
throw new RuntimeException(
"Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
Expand All @@ -602,22 +606,22 @@ private static org.apache.avro.Schema getFieldSchema(
case ARRAY:
baseType =
org.apache.avro.Schema.createArray(
getFieldSchema(fieldName, fieldType.getCollectionElementType()));
getFieldSchema(fieldType.getCollectionElementType(), fieldName, namespace));
break;

case MAP:
if (fieldType.getMapKeyType().getTypeName().isStringType()) {
// Avro only supports string keys in maps.
baseType =
org.apache.avro.Schema.createMap(
getFieldSchema(fieldName, fieldType.getMapValueType()));
getFieldSchema(fieldType.getMapValueType(), fieldName, namespace));
} else {
throw new IllegalArgumentException("Avro only supports maps with string keys");
}
break;

case ROW:
baseType = toAvroSchema(fieldName, fieldType.getRowSchema());
baseType = toAvroSchema(fieldType.getRowSchema(), fieldName, namespace);
break;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void testNullableBeamArrayFieldToAvroField() {
"",
null);

org.apache.avro.Schema.Field avroField = AvroUtils.toAvroField(beamField);
org.apache.avro.Schema.Field avroField = AvroUtils.toAvroField(beamField, "ignored");
assertEquals(expectedAvroField, avroField);
}

Expand All @@ -188,7 +188,8 @@ private static List<org.apache.avro.Schema.Field> getAvroSubSchemaFields() {
}

private static org.apache.avro.Schema getAvroSubSchema(String name) {
return org.apache.avro.Schema.createRecord(name, null, null, false, getAvroSubSchemaFields());
return org.apache.avro.Schema.createRecord(
name, null, "topLevelRecord", false, getAvroSubSchemaFields());
}

private static org.apache.avro.Schema getAvroSchema() {
Expand Down Expand Up @@ -340,10 +341,57 @@ public void testFromBeamSchema() {

@Test
public void testAvroSchemaFromBeamSchemaCanBeParsed() {
Schema beamSchema = getBeamSchema();
String stringSchema = AvroUtils.toAvroSchema(getBeamSchema()).toString();
org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(stringSchema);
assertEquals(stringSchema, schema.toString());
org.apache.avro.Schema convertedSchema = AvroUtils.toAvroSchema(getBeamSchema());
org.apache.avro.Schema validatedSchema =
new org.apache.avro.Schema.Parser().parse(convertedSchema.toString());
assertEquals(convertedSchema, validatedSchema);
}

@Test
public void testAvroSchemaFromBeamSchemaWithFieldCollisionCanBeParsed() {

// Two similar schemas, the only difference is the "street" field type in the nested record.
Schema contact =
new Schema.Builder()
.addField(Field.of("name", FieldType.STRING))
.addField(
Field.of(
"address",
FieldType.row(
new Schema.Builder()
.addField(Field.of("street", FieldType.STRING))
.addField(Field.of("city", FieldType.STRING))
.build())))
.build();

Schema contactMultiline =
new Schema.Builder()
.addField(Field.of("name", FieldType.STRING))
.addField(
Field.of(
"address",
FieldType.row(
new Schema.Builder()
.addField(Field.of("street", FieldType.array(FieldType.STRING)))
.addField(Field.of("city", FieldType.STRING))
.build())))
.build();

// Ensure that no collisions happen between two sibling fields with same-named child fields
// (with different schemas, between a parent field and a sub-record field with the same name,
// and artificially with the generated field name.
Schema beamSchema =
new Schema.Builder()
.addField(Field.of("home", FieldType.row(contact)))
.addField(Field.of("work", FieldType.row(contactMultiline)))
.addField(Field.of("address", FieldType.row(contact)))
.addField(Field.of("topLevelRecord", FieldType.row(contactMultiline)))
.build();

org.apache.avro.Schema convertedSchema = AvroUtils.toAvroSchema(beamSchema);
org.apache.avro.Schema validatedSchema =
new org.apache.avro.Schema.Parser().parse(convertedSchema.toString());
assertEquals(convertedSchema, validatedSchema);
}

@Test
Expand Down Expand Up @@ -469,7 +517,10 @@ public void testAvroSchemaCoders() {
false,
getAvroSubSchemaFields());
GenericRecord record =
new GenericRecordBuilder(getAvroSubSchema("simple")).set("bool", true).set("int", 42).build();
new GenericRecordBuilder(getAvroSubSchema("simple"))
.set("bool", true)
.set("int", 42)
.build();

PCollection<GenericRecord> records =
pipeline.apply(Create.of(record).withCoder(AvroCoder.of(schema)));
Expand Down

0 comments on commit 0871831

Please sign in to comment.