Skip to content

Commit

Permalink
Issue #1046: Allow disabling automatic Map conversion (#1079)
Browse files Browse the repository at this point in the history
  • Loading branch information
isha97 committed Oct 5, 2023
1 parent 350d93e commit deaa625
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Issue #144: allow writing Spark String to BQ TIME type
* Issue #867: Support writing with RangePartitioning
* Issue #1062: Adding dataproc job ID and UUID labels to BigQuery jobs
* Issue #1046: Add a way to disable map type support
* PR #1008: Adding support to expose BigQuery metrics using Spark custom metrics API.
* PR #1038: Logical plan now shows the BigQuery table of DirectBigQueryRelation. Thanks @idc101 !
* PR #1058: View names will appear in query plan instead of the materialized table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ StructField convert(Field field) {
}

Optional<StructField> convertMap(Field field, Metadata metadata) {
if (!configuration.getAllowMapTypeConversion()) {
return Optional.empty();
}
if (field.getMode() != Field.Mode.REPEATED) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,56 @@ public class SchemaConvertersConfiguration implements Serializable {

private final ZoneId datetimeZoneId;

private SchemaConvertersConfiguration(ZoneId datetimeZoneId) {
private final boolean allowMapTypeConversion;

private SchemaConvertersConfiguration(ZoneId datetimeZoneId, boolean allowMapTypeConversion) {
this.datetimeZoneId = datetimeZoneId;
this.allowMapTypeConversion = allowMapTypeConversion;
}

public static SchemaConvertersConfiguration from(SparkBigQueryConfig config) {
return SchemaConvertersConfiguration.of(config.getDatetimeZoneId());
return SchemaConvertersConfiguration.of(
config.getDatetimeZoneId(), config.getAllowMapTypeConversion());
}

public static SchemaConvertersConfiguration of(@Nonnull ZoneId datetimeZoneId) {
return new SchemaConvertersConfiguration(datetimeZoneId);
return new SchemaConvertersConfiguration(datetimeZoneId, true);
}

public static SchemaConvertersConfiguration of(
@Nonnull ZoneId datetimeZoneId, @Nonnull boolean allowMapTypeConversion) {
return new SchemaConvertersConfiguration(datetimeZoneId, allowMapTypeConversion);
}

public ZoneId getDatetimeZoneId() {
return datetimeZoneId;
}

public boolean getAllowMapTypeConversion() {
return allowMapTypeConversion;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchemaConvertersConfiguration that = (SchemaConvertersConfiguration) o;
return Objects.equal(datetimeZoneId, that.datetimeZoneId);
return Objects.equal(datetimeZoneId, that.datetimeZoneId)
&& Objects.equal(allowMapTypeConversion, that.allowMapTypeConversion);
}

@Override
public int hashCode() {
return Objects.hashCode(datetimeZoneId);
return Objects.hashCode(datetimeZoneId, allowMapTypeConversion);
}

@Override
public String toString() {
return "SchemaConvertersConfiguration{" + "datetimeZoneId=" + datetimeZoneId + '}';
return "SchemaConvertersConfiguration{"
+ "datetimeZoneId="
+ datetimeZoneId
+ ",allowMapTypeConversion="
+ allowMapTypeConversion
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ public static WriteMethod from(@Nullable String writeMethod) {
static final String BIGQUERY_TABLE_LABEL_PREFIX = "bigQueryTableLabel.";
public static final Priority DEFAULT_JOB_PRIORITY = Priority.INTERACTIVE;

static final String ALLOW_MAP_TYPE_CONVERSION = "allowMapTypeConversion";

private static final Boolean ALLOW_MAP_TYPE_CONVERSION_DEFAULT = true;

TableId tableId;
// as the config needs to be Serializable, internally it uses
// com.google.common.base.Optional<String> but externally it uses the regular java.util.Optional
Expand Down Expand Up @@ -222,6 +226,8 @@ public static WriteMethod from(@Nullable String writeMethod) {

private com.google.common.base.Optional<String> destinationTableKmsKeyName = empty();

private boolean allowMapTypeConversion = ALLOW_MAP_TYPE_CONVERSION_DEFAULT;

@VisibleForTesting
SparkBigQueryConfig() {
// empty
Expand Down Expand Up @@ -549,6 +555,11 @@ public static SparkBigQueryConfig from(
config.destinationTableKmsKeyName =
getAnyOption(globalOptions, options, "destinationTableKmsKeyName");

config.allowMapTypeConversion =
getAnyOption(globalOptions, options, ALLOW_MAP_TYPE_CONVERSION)
.transform(Boolean::valueOf)
.or(ALLOW_MAP_TYPE_CONVERSION_DEFAULT);

return config;
}

Expand Down Expand Up @@ -985,6 +996,10 @@ public ImmutableMap<String, String> getBigQueryJobLabels() {
return bigQueryJobLabels;
}

public boolean getAllowMapTypeConversion() {
return allowMapTypeConversion;
}

public ImmutableMap<String, String> getBigQueryTableLabels() {
return bigQueryTableLabels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.connector.common.BigQueryUtil;
import java.time.ZoneId;
import java.util.Optional;
import org.apache.spark.ml.linalg.SQLDataTypes;
import org.apache.spark.sql.types.*;
Expand Down Expand Up @@ -193,6 +194,16 @@ private void VerifyDecimalConversion(int precision, int scale, LegacySQLTypeName
assertThat(field.getScale()).isEqualTo(scale);
}

private Field getKeyValueRepeatedField() {
return Field.newBuilder(
"foo",
LegacySQLTypeName.RECORD,
Field.of("key", LegacySQLTypeName.INTEGER),
Field.of("value", LegacySQLTypeName.STRING))
.setMode(Mode.REPEATED)
.build();
}

@Test
public void testFailureOnTooWideBigNumericConversion() throws Exception {
assertThrows(
Expand Down Expand Up @@ -375,22 +386,36 @@ public void testConvertBigQueryMapToSparkMap_wrong_record_fields() {
public void testConvertBigQueryMapToSparkMap_with_actual_map() {
Optional<StructField> fieldOpt =
SchemaConverters.from(SCHEMA_CONVERTERS_CONFIGURATION)
.convertMap(
Field.newBuilder(
"foo",
LegacySQLTypeName.RECORD,
Field.of("key", LegacySQLTypeName.INTEGER),
Field.of("value", LegacySQLTypeName.STRING))
.setMode(Mode.REPEATED)
.build(),
Metadata.empty());
.convertMap(getKeyValueRepeatedField(), Metadata.empty());
MapType longToStringMapType = DataTypes.createMapType(DataTypes.LongType, DataTypes.StringType);
assertThat(fieldOpt.isPresent()).isTrue();
StructField field = fieldOpt.get();
assertThat(field.dataType()).isEqualTo(longToStringMapType);
assertThat(field.name()).isEqualTo("foo");
}

@Test
public void testConvertBigQueryMapToSparkMap_mapTypeConversionDisabled() {
Optional<StructField> fieldOpt =
SchemaConverters.from(SchemaConvertersConfiguration.of(ZoneId.of("UTC"), false))
.convertMap(getKeyValueRepeatedField(), Metadata.empty());
assertThat(fieldOpt.isPresent()).isFalse();
}

@Test
public void testConvertBigQueryToSparkArray_mapTypeConversionDisabled() {
StructField field =
SchemaConverters.from(SchemaConvertersConfiguration.of(ZoneId.of("UTC"), false))
.convert(getKeyValueRepeatedField());
StructType elementType =
new StructType()
.add("key", DataTypes.LongType, true)
.add("value", DataTypes.StringType, true);
ArrayType arrayType = new ArrayType(elementType, true);
assertThat(field.dataType()).isEqualTo(arrayType);
assertThat(field.name()).isEqualTo("foo");
}

@Test
public void testCreateDecimalTypeFromNumericField() throws Exception {
// new builder instance is needed for each test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void testDefaults() {
assertThat(config.getDatetimeZoneId()).isEqualTo(ZoneId.of("UTC"));
assertThat(config.getQueryJobPriority()).isEqualTo(SparkBigQueryConfig.DEFAULT_JOB_PRIORITY);
assertThat(config.getKmsKeyName()).isEqualTo(Optional.empty());
assertThat(config.getAllowMapTypeConversion()).isTrue();
}

@Test
Expand Down Expand Up @@ -176,6 +177,7 @@ public void testConfigFromOptions() {
.put("datetimeZoneId", "Asia/Jerusalem")
.put("queryJobPriority", "batch")
.put("destinationTableKmsKeyName", "some/key/name")
.put("allowMapTypeConversion", "false")
.build());
SparkBigQueryConfig config =
SparkBigQueryConfig.from(
Expand Down Expand Up @@ -228,6 +230,7 @@ public void testConfigFromOptions() {
assertThat(config.getDatetimeZoneId()).isEqualTo(ZoneId.of("Asia/Jerusalem"));
assertThat(config.getQueryJobPriority()).isEqualTo(Priority.valueOf("BATCH"));
assertThat(config.getKmsKeyName()).isEqualTo(Optional.of("some/key/name"));
assertThat(config.getAllowMapTypeConversion()).isFalse();
}

@Test
Expand Down

0 comments on commit deaa625

Please sign in to comment.