Skip to content

Commit

Permalink
[BEAM-10587] Support Maps in BigQuery (#12389)
Browse files Browse the repository at this point in the history
  • Loading branch information
rworley-monster committed Oct 15, 2020
1 parent 6969faa commit 0e0ffac
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,30 @@ public abstract static class Builder {
}
}

/** Options for how to convert BigQuery schemas to Beam schemas. */
@AutoValue
public abstract static class SchemaConversionOptions implements Serializable {

/**
* Controls whether to use the map or row FieldType for a TableSchema field that appears to
* represent a map (it is an array of structs containing only {@code key} and {@code value}
* fields).
*/
public abstract boolean getInferMaps();

public static Builder builder() {
return new AutoValue_BigQueryUtils_SchemaConversionOptions.Builder().setInferMaps(false);
}

/** Builder for {@link SchemaConversionOptions}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setInferMaps(boolean inferMaps);

public abstract SchemaConversionOptions build();
}
}

private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PRINTER;

/**
Expand Down Expand Up @@ -200,6 +224,9 @@ public abstract static class Builder {
.put("SqlCharType", StandardSQLTypeName.STRING)
.build();

private static final String BIGQUERY_MAP_KEY_FIELD_NAME = "key";
private static final String BIGQUERY_MAP_VALUE_FIELD_NAME = "value";

/**
* Get the corresponding BigQuery {@link StandardSQLTypeName} for supported Beam {@link
* FieldType}.
Expand Down Expand Up @@ -235,7 +262,7 @@ private static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
*/
@Experimental(Kind.SCHEMAS)
private static FieldType fromTableFieldSchemaType(
String typeName, List<TableFieldSchema> nestedFields) {
String typeName, List<TableFieldSchema> nestedFields, SchemaConversionOptions options) {
switch (typeName) {
case "STRING":
return FieldType.STRING;
Expand Down Expand Up @@ -265,22 +292,36 @@ private static FieldType fromTableFieldSchemaType(
"SqlTimestampWithLocalTzType", FieldType.STRING, "", FieldType.DATETIME) {});
case "STRUCT":
case "RECORD":
Schema rowSchema = fromTableFieldSchema(nestedFields);
if (options.getInferMaps() && nestedFields.size() == 2) {
TableFieldSchema key = nestedFields.get(0);
TableFieldSchema value = nestedFields.get(1);
if (BIGQUERY_MAP_KEY_FIELD_NAME.equals(key.getName())
&& BIGQUERY_MAP_VALUE_FIELD_NAME.equals(value.getName())) {
return FieldType.map(
fromTableFieldSchemaType(key.getType(), key.getFields(), options),
fromTableFieldSchemaType(value.getType(), value.getFields(), options));
}
}

Schema rowSchema = fromTableFieldSchema(nestedFields, options);
return FieldType.row(rowSchema);
default:
throw new UnsupportedOperationException(
"Converting BigQuery type " + typeName + " to Beam type is unsupported");
}
}

private static Schema fromTableFieldSchema(List<TableFieldSchema> tableFieldSchemas) {
private static Schema fromTableFieldSchema(
List<TableFieldSchema> tableFieldSchemas, SchemaConversionOptions options) {
Schema.Builder schemaBuilder = Schema.builder();
for (TableFieldSchema tableFieldSchema : tableFieldSchemas) {
FieldType fieldType =
fromTableFieldSchemaType(tableFieldSchema.getType(), tableFieldSchema.getFields());
fromTableFieldSchemaType(
tableFieldSchema.getType(), tableFieldSchema.getFields(), options);

Optional<Mode> fieldMode = Optional.ofNullable(tableFieldSchema.getMode()).map(Mode::valueOf);
if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()
&& !fieldType.getTypeName().isMapType()) {
fieldType = FieldType.array(fieldType);
}

Expand Down Expand Up @@ -322,7 +363,14 @@ private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
field.setFields(toTableFieldSchema(subType));
}
if (TypeName.MAP == type.getTypeName()) {
throw new IllegalArgumentException("Maps are not supported in BigQuery.");
Schema mapSchema =
Schema.builder()
.addField(BIGQUERY_MAP_KEY_FIELD_NAME, type.getMapKeyType())
.addField(BIGQUERY_MAP_VALUE_FIELD_NAME, type.getMapValueType())
.build();
type = FieldType.row(mapSchema);
field.setFields(toTableFieldSchema(mapSchema));
field.setMode(Mode.REPEATED.toString());
}
field.setType(toStandardSQLTypeName(type).toString());

Expand All @@ -340,7 +388,13 @@ public static TableSchema toTableSchema(Schema schema) {
/** Convert a BigQuery {@link TableSchema} to a Beam {@link Schema}. */
@Experimental(Kind.SCHEMAS)
public static Schema fromTableSchema(TableSchema tableSchema) {
return fromTableFieldSchema(tableSchema.getFields());
return fromTableSchema(tableSchema, SchemaConversionOptions.builder().build());
}

/** Convert a BigQuery {@link TableSchema} to a Beam {@link Schema}. */
@Experimental(Kind.SCHEMAS)
public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOptions options) {
return fromTableFieldSchema(tableSchema.getFields(), options);
}

/** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */
Expand Down Expand Up @@ -445,6 +499,21 @@ public static TableRow toTableRow(Row row) {
}
return convertedItems;

case MAP:
FieldType keyElementType = fieldType.getMapKeyType();
FieldType valueElementType = fieldType.getMapValueType();
Map<?, ?> pairs = (Map<?, ?>) fieldValue;
convertedItems = Lists.newArrayListWithCapacity(pairs.size());
for (Map.Entry<?, ?> pair : pairs.entrySet()) {
convertedItems.add(
new TableRow()
.set(BIGQUERY_MAP_KEY_FIELD_NAME, fromBeamField(keyElementType, pair.getKey()))
.set(
BIGQUERY_MAP_VALUE_FIELD_NAME,
fromBeamField(valueElementType, pair.getValue())));
}
return convertedItems;

case ROW:
return toTableRow((Row) fieldValue);

Expand Down Expand Up @@ -643,7 +712,7 @@ public static Object convertAvroFormat(
case DECIMAL:
throw new RuntimeException("Does not support converting DECIMAL type value");
case MAP:
throw new RuntimeException("Does not support converting MAP type value");
return convertAvroRecordToMap(beamFieldType, avroValue, options);
default:
throw new RuntimeException(
"Does not support converting unknown type value: " + beamFieldTypeName);
Expand Down Expand Up @@ -681,6 +750,20 @@ private static Object convertAvroArray(
return ret;
}

private static Object convertAvroRecordToMap(
FieldType beamField, Object value, BigQueryUtils.ConversionOptions options) {
List<GenericData.Record> records = (List<GenericData.Record>) value;
ImmutableMap.Builder<Object, Object> ret = ImmutableMap.builder();
FieldType keyElement = beamField.getMapKeyType();
FieldType valueElement = beamField.getMapValueType();
for (GenericData.Record record : records) {
ret.put(
convertAvroFormat(keyElement, record.get(0), options),
convertAvroFormat(valueElement, record.get(1), options));
}
return ret.build();
}

private static Object convertAvroPrimitiveTypes(TypeName beamType, Object value) {
switch (beamType) {
case BYTE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.chrono.ISOChronology;
Expand All @@ -73,6 +74,9 @@ public class BigQueryUtilsTest {
.addNullableField("numeric", Schema.FieldType.DECIMAL)
.build();

private static final Schema MAP_TYPE =
Schema.builder().addStringField("key").addDoubleField("value").build();

private static final Schema ARRAY_TYPE =
Schema.builder().addArrayField("ids", Schema.FieldType.INT64).build();

Expand All @@ -82,6 +86,12 @@ public class BigQueryUtilsTest {
private static final Schema ARRAY_ROW_TYPE =
Schema.builder().addArrayField("rows", Schema.FieldType.row(FLAT_TYPE)).build();

private static final Schema MAP_ARRAY_TYPE =
Schema.builder().addArrayField("map", Schema.FieldType.row(MAP_TYPE)).build();

private static final Schema MAP_MAP_TYPE =
Schema.builder().addMapField("map", Schema.FieldType.STRING, Schema.FieldType.DOUBLE).build();

private static final TableFieldSchema ID =
new TableFieldSchema().setName("id").setType(StandardSQLTypeName.INT64.toString());

Expand Down Expand Up @@ -123,6 +133,18 @@ public class BigQueryUtilsTest {
.setType(StandardSQLTypeName.INT64.toString())
.setMode(Mode.REPEATED.toString());

private static final TableFieldSchema MAP_KEY =
new TableFieldSchema()
.setName("key")
.setType(StandardSQLTypeName.STRING.toString())
.setMode(Mode.REQUIRED.toString());

private static final TableFieldSchema MAP_VALUE =
new TableFieldSchema()
.setName("value")
.setType(StandardSQLTypeName.FLOAT64.toString())
.setMode(Mode.REQUIRED.toString());

private static final TableFieldSchema ROW =
new TableFieldSchema()
.setName("row")
Expand Down Expand Up @@ -159,6 +181,13 @@ public class BigQueryUtilsTest {
BINARY,
NUMERIC));

private static final TableFieldSchema MAP =
new TableFieldSchema()
.setName("map")
.setType(StandardSQLTypeName.STRUCT.toString())
.setMode(Mode.REPEATED.toString())
.setFields(Arrays.asList(MAP_KEY, MAP_VALUE));

// Make sure that chosen BYTES test value is the same after a full base64 round trip.
private static final Row FLAT_ROW =
Row.withSchema(FLAT_TYPE)
Expand Down Expand Up @@ -219,6 +248,9 @@ public class BigQueryUtilsTest {
private static final Row ARRAY_ROW =
Row.withSchema(ARRAY_TYPE).addValues((Object) Arrays.asList(123L, 124L)).build();

private static final Row MAP_ROW =
Row.withSchema(MAP_MAP_TYPE).addValues(ImmutableMap.of("test", 123.456)).build();

private static final TableRow BQ_ARRAY_ROW =
new TableRow()
.set(
Expand Down Expand Up @@ -259,6 +291,8 @@ public class BigQueryUtilsTest {
private static final TableSchema BQ_ARRAY_ROW_TYPE =
new TableSchema().setFields(Arrays.asList(ROWS));

private static final TableSchema BQ_MAP_TYPE = new TableSchema().setFields(Arrays.asList(MAP));

private static final Schema AVRO_FLAT_TYPE =
Schema.builder()
.addNullableField("id", Schema.FieldType.INT64)
Expand Down Expand Up @@ -347,6 +381,18 @@ public void testToTableSchema_array_row() {
NUMERIC));
}

@Test
public void testToTableSchema_map() {
TableSchema schema = toTableSchema(MAP_MAP_TYPE);

assertThat(schema.getFields().size(), equalTo(1));
TableFieldSchema field = schema.getFields().get(0);
assertThat(field.getName(), equalTo("map"));
assertThat(field.getType(), equalTo(StandardSQLTypeName.STRUCT.toString()));
assertThat(field.getMode(), equalTo(Mode.REPEATED.toString()));
assertThat(field.getFields(), containsInAnyOrder(MAP_KEY, MAP_VALUE));
}

@Test
public void testToTableRow_flat() {
TableRow row = toTableRow().apply(FLAT_ROW);
Expand All @@ -368,6 +414,17 @@ public void testToTableRow_array() {
assertThat(row.size(), equalTo(1));
}

@Test
public void testToTableRow_map() {
TableRow row = toTableRow().apply(MAP_ROW);

assertThat(row.size(), equalTo(1));
row = ((List<TableRow>) row.get("map")).get(0);
assertThat(row.size(), equalTo(2));
assertThat(row, hasEntry("key", "test"));
assertThat(row, hasEntry("value", "123.456"));
}

@Test
public void testToTableRow_row() {
TableRow row = toTableRow().apply(ROW_ROW);
Expand Down Expand Up @@ -427,6 +484,9 @@ public void testToTableRow_null_row() {
.setTruncateTimestamps(TruncateTimestamps.REJECT)
.build();

private static final BigQueryUtils.SchemaConversionOptions INFER_MAPS_OPTIONS =
BigQueryUtils.SchemaConversionOptions.builder().setInferMaps(true).build();

@Test
public void testSubMilliPrecisionRejected() {
assertThrows(
Expand Down Expand Up @@ -501,6 +561,18 @@ public void testFromTableSchema_array_row() {
assertEquals(ARRAY_ROW_TYPE, beamSchema);
}

@Test
public void testFromTableSchema_map_array() {
Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_MAP_TYPE);
assertEquals(MAP_ARRAY_TYPE, beamSchema);
}

@Test
public void testFromTableSchema_map_map() {
Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_MAP_TYPE, INFER_MAPS_OPTIONS);
assertEquals(MAP_MAP_TYPE, beamSchema);
}

@Test
public void testToBeamRow_flat() {
Row beamRow = BigQueryUtils.toBeamRow(FLAT_TYPE, BQ_FLAT_ROW);
Expand Down

0 comments on commit 0e0ffac

Please sign in to comment.