Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10587] Support Maps in BigQuery #12389

Merged
merged 10 commits into from
Oct 15, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,30 @@ public abstract static class Builder {
}
}

/** Options for how to convert BigQuery schemas to Beam schemas. */
@AutoValue
public abstract static class SchemaConversionOptions implements Serializable {

/**
* Controls whether to use the map or row FieldType for a TableSchema field that appears to
* represent a map (it is an array of structs containing only {@code key} and {@code value}
* fields).
*/
public abstract boolean getInferMaps();

public static Builder builder() {
return new AutoValue_BigQueryUtils_SchemaConversionOptions.Builder().setInferMaps(false);
}

/** Builder for {@link SchemaConversionOptions}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setInferMaps(boolean inferMaps);

public abstract SchemaConversionOptions build();
}
}

private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PRINTER;

/**
Expand Down Expand Up @@ -200,6 +224,9 @@ public abstract static class Builder {
.put("SqlCharType", StandardSQLTypeName.STRING)
.build();

private static final String BIGQUERY_MAP_KEY_FIELD_NAME = "key";
private static final String BIGQUERY_MAP_VALUE_FIELD_NAME = "value";

/**
* Get the corresponding BigQuery {@link StandardSQLTypeName} for supported Beam {@link
* FieldType}.
Expand Down Expand Up @@ -235,7 +262,7 @@ private static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
*/
@Experimental(Kind.SCHEMAS)
private static FieldType fromTableFieldSchemaType(
String typeName, List<TableFieldSchema> nestedFields) {
String typeName, List<TableFieldSchema> nestedFields, SchemaConversionOptions options) {
switch (typeName) {
case "STRING":
return FieldType.STRING;
Expand Down Expand Up @@ -265,22 +292,36 @@ private static FieldType fromTableFieldSchemaType(
"SqlTimestampWithLocalTzType", FieldType.STRING, "", FieldType.DATETIME) {});
case "STRUCT":
case "RECORD":
Schema rowSchema = fromTableFieldSchema(nestedFields);
if (options.getInferMaps() && nestedFields.size() == 2) {
TableFieldSchema key = nestedFields.get(0);
TableFieldSchema value = nestedFields.get(1);
if (BIGQUERY_MAP_KEY_FIELD_NAME.equals(key.getName())
&& BIGQUERY_MAP_VALUE_FIELD_NAME.equals(value.getName())) {
return FieldType.map(
fromTableFieldSchemaType(key.getType(), key.getFields(), options),
fromTableFieldSchemaType(value.getType(), value.getFields(), options));
}
}

Schema rowSchema = fromTableFieldSchema(nestedFields, options);
return FieldType.row(rowSchema);
default:
throw new UnsupportedOperationException(
"Converting BigQuery type " + typeName + " to Beam type is unsupported");
}
}

private static Schema fromTableFieldSchema(List<TableFieldSchema> tableFieldSchemas) {
private static Schema fromTableFieldSchema(
List<TableFieldSchema> tableFieldSchemas, SchemaConversionOptions options) {
Schema.Builder schemaBuilder = Schema.builder();
for (TableFieldSchema tableFieldSchema : tableFieldSchemas) {
FieldType fieldType =
fromTableFieldSchemaType(tableFieldSchema.getType(), tableFieldSchema.getFields());
fromTableFieldSchemaType(
tableFieldSchema.getType(), tableFieldSchema.getFields(), options);

Optional<Mode> fieldMode = Optional.ofNullable(tableFieldSchema.getMode()).map(Mode::valueOf);
if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()
&& !fieldType.getTypeName().isMapType()) {
fieldType = FieldType.array(fieldType);
}

Expand Down Expand Up @@ -322,7 +363,14 @@ private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
field.setFields(toTableFieldSchema(subType));
}
if (TypeName.MAP == type.getTypeName()) {
throw new IllegalArgumentException("Maps are not supported in BigQuery.");
Schema mapSchema =
Schema.builder()
.addField(BIGQUERY_MAP_KEY_FIELD_NAME, type.getMapKeyType())
.addField(BIGQUERY_MAP_VALUE_FIELD_NAME, type.getMapValueType())
.build();
type = FieldType.row(mapSchema);
field.setFields(toTableFieldSchema(mapSchema));
field.setMode(Mode.REPEATED.toString());
}
field.setType(toStandardSQLTypeName(type).toString());

Expand All @@ -340,7 +388,13 @@ public static TableSchema toTableSchema(Schema schema) {
/** Convert a BigQuery {@link TableSchema} to a Beam {@link Schema}. */
@Experimental(Kind.SCHEMAS)
public static Schema fromTableSchema(TableSchema tableSchema) {
return fromTableFieldSchema(tableSchema.getFields());
return fromTableSchema(tableSchema, SchemaConversionOptions.builder().build());
}

/** Convert a BigQuery {@link TableSchema} to a Beam {@link Schema}. */
@Experimental(Kind.SCHEMAS)
public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOptions options) {
return fromTableFieldSchema(tableSchema.getFields(), options);
}

/** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */
Expand Down Expand Up @@ -445,6 +499,21 @@ public static TableRow toTableRow(Row row) {
}
return convertedItems;

case MAP:
FieldType keyElementType = fieldType.getMapKeyType();
FieldType valueElementType = fieldType.getMapValueType();
Map<?, ?> pairs = (Map<?, ?>) fieldValue;
convertedItems = Lists.newArrayListWithCapacity(pairs.size());
for (Map.Entry<?, ?> pair : pairs.entrySet()) {
convertedItems.add(
new TableRow()
.set(BIGQUERY_MAP_KEY_FIELD_NAME, fromBeamField(keyElementType, pair.getKey()))
.set(
BIGQUERY_MAP_VALUE_FIELD_NAME,
fromBeamField(valueElementType, pair.getValue())));
}
return convertedItems;

case ROW:
return toTableRow((Row) fieldValue);

Expand Down Expand Up @@ -643,7 +712,7 @@ public static Object convertAvroFormat(
case DECIMAL:
throw new RuntimeException("Does not support converting DECIMAL type value");
case MAP:
throw new RuntimeException("Does not support converting MAP type value");
return convertAvroRecordToMap(beamFieldType, avroValue, options);
default:
throw new RuntimeException(
"Does not support converting unknown type value: " + beamFieldTypeName);
Expand Down Expand Up @@ -681,6 +750,20 @@ private static Object convertAvroArray(
return ret;
}

private static Object convertAvroRecordToMap(
FieldType beamField, Object value, BigQueryUtils.ConversionOptions options) {
List<GenericData.Record> records = (List<GenericData.Record>) value;
ImmutableMap.Builder<Object, Object> ret = ImmutableMap.builder();
FieldType keyElement = beamField.getMapKeyType();
FieldType valueElement = beamField.getMapValueType();
for (GenericData.Record record : records) {
ret.put(
convertAvroFormat(keyElement, record.get(0), options),
convertAvroFormat(valueElement, record.get(1), options));
}
return ret.build();
}

private static Object convertAvroPrimitiveTypes(TypeName beamType, Object value) {
switch (beamType) {
case BYTE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.chrono.ISOChronology;
Expand All @@ -73,6 +74,9 @@ public class BigQueryUtilsTest {
.addNullableField("numeric", Schema.FieldType.DECIMAL)
.build();

private static final Schema MAP_TYPE =
Schema.builder().addStringField("key").addDoubleField("value").build();

private static final Schema ARRAY_TYPE =
Schema.builder().addArrayField("ids", Schema.FieldType.INT64).build();

Expand All @@ -82,6 +86,12 @@ public class BigQueryUtilsTest {
private static final Schema ARRAY_ROW_TYPE =
Schema.builder().addArrayField("rows", Schema.FieldType.row(FLAT_TYPE)).build();

private static final Schema MAP_ARRAY_TYPE =
Schema.builder().addArrayField("map", Schema.FieldType.row(MAP_TYPE)).build();

private static final Schema MAP_MAP_TYPE =
Schema.builder().addMapField("map", Schema.FieldType.STRING, Schema.FieldType.DOUBLE).build();

private static final TableFieldSchema ID =
new TableFieldSchema().setName("id").setType(StandardSQLTypeName.INT64.toString());

Expand Down Expand Up @@ -123,6 +133,18 @@ public class BigQueryUtilsTest {
.setType(StandardSQLTypeName.INT64.toString())
.setMode(Mode.REPEATED.toString());

private static final TableFieldSchema MAP_KEY =
new TableFieldSchema()
.setName("key")
.setType(StandardSQLTypeName.STRING.toString())
.setMode(Mode.REQUIRED.toString());

private static final TableFieldSchema MAP_VALUE =
new TableFieldSchema()
.setName("value")
.setType(StandardSQLTypeName.FLOAT64.toString())
.setMode(Mode.REQUIRED.toString());

private static final TableFieldSchema ROW =
new TableFieldSchema()
.setName("row")
Expand Down Expand Up @@ -159,6 +181,13 @@ public class BigQueryUtilsTest {
BINARY,
NUMERIC));

private static final TableFieldSchema MAP =
new TableFieldSchema()
.setName("map")
.setType(StandardSQLTypeName.STRUCT.toString())
.setMode(Mode.REPEATED.toString())
.setFields(Arrays.asList(MAP_KEY, MAP_VALUE));

// Make sure that chosen BYTES test value is the same after a full base64 round trip.
private static final Row FLAT_ROW =
Row.withSchema(FLAT_TYPE)
Expand Down Expand Up @@ -219,6 +248,9 @@ public class BigQueryUtilsTest {
private static final Row ARRAY_ROW =
Row.withSchema(ARRAY_TYPE).addValues((Object) Arrays.asList(123L, 124L)).build();

private static final Row MAP_ROW =
Row.withSchema(MAP_MAP_TYPE).addValues(ImmutableMap.of("test", 123.456)).build();

private static final TableRow BQ_ARRAY_ROW =
new TableRow()
.set(
Expand Down Expand Up @@ -259,6 +291,8 @@ public class BigQueryUtilsTest {
private static final TableSchema BQ_ARRAY_ROW_TYPE =
new TableSchema().setFields(Arrays.asList(ROWS));

private static final TableSchema BQ_MAP_TYPE = new TableSchema().setFields(Arrays.asList(MAP));

private static final Schema AVRO_FLAT_TYPE =
Schema.builder()
.addNullableField("id", Schema.FieldType.INT64)
Expand Down Expand Up @@ -347,6 +381,18 @@ public void testToTableSchema_array_row() {
NUMERIC));
}

@Test
public void testToTableSchema_map() {
TableSchema schema = toTableSchema(MAP_MAP_TYPE);

assertThat(schema.getFields().size(), equalTo(1));
TableFieldSchema field = schema.getFields().get(0);
assertThat(field.getName(), equalTo("map"));
assertThat(field.getType(), equalTo(StandardSQLTypeName.STRUCT.toString()));
assertThat(field.getMode(), equalTo(Mode.REPEATED.toString()));
assertThat(field.getFields(), containsInAnyOrder(MAP_KEY, MAP_VALUE));
}

@Test
public void testToTableRow_flat() {
TableRow row = toTableRow().apply(FLAT_ROW);
Expand All @@ -368,6 +414,17 @@ public void testToTableRow_array() {
assertThat(row.size(), equalTo(1));
}

@Test
public void testToTableRow_map() {
TableRow row = toTableRow().apply(MAP_ROW);

assertThat(row.size(), equalTo(1));
row = ((List<TableRow>) row.get("map")).get(0);
assertThat(row.size(), equalTo(2));
assertThat(row, hasEntry("key", "test"));
assertThat(row, hasEntry("value", "123.456"));
}

@Test
public void testToTableRow_row() {
TableRow row = toTableRow().apply(ROW_ROW);
Expand Down Expand Up @@ -427,6 +484,9 @@ public void testToTableRow_null_row() {
.setTruncateTimestamps(TruncateTimestamps.REJECT)
.build();

private static final BigQueryUtils.SchemaConversionOptions INFER_MAPS_OPTIONS =
BigQueryUtils.SchemaConversionOptions.builder().setInferMaps(true).build();

@Test
public void testSubMilliPrecisionRejected() {
assertThrows(
Expand Down Expand Up @@ -501,6 +561,18 @@ public void testFromTableSchema_array_row() {
assertEquals(ARRAY_ROW_TYPE, beamSchema);
}

@Test
public void testFromTableSchema_map_array() {
Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_MAP_TYPE);
assertEquals(MAP_ARRAY_TYPE, beamSchema);
}

@Test
public void testFromTableSchema_map_map() {
Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_MAP_TYPE, INFER_MAPS_OPTIONS);
assertEquals(MAP_MAP_TYPE, beamSchema);
}

@Test
public void testToBeamRow_flat() {
Row beamRow = BigQueryUtils.toBeamRow(FLAT_TYPE, BQ_FLAT_ROW);
Expand Down