Skip to content

Commit

Permalink
[HUDI-3523] Introduce AddPrimitiveColumnSchemaPostProcessor to suppor…
Browse files Browse the repository at this point in the history
…t add new primitive column to the end of a schema
  • Loading branch information
wangxianghu committed Aug 27, 2022
1 parent 797e7a6 commit 25773ca
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 13 deletions.
Expand Up @@ -45,7 +45,7 @@
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.schema.ChainedSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
Expand Down
Expand Up @@ -16,11 +16,12 @@
* limitations under the License.
*/

package org.apache.hudi.utilities.schema;
package org.apache.hudi.utilities.schema.postprocessor;

import org.apache.hudi.common.config.TypedProperties;

import org.apache.avro.Schema;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.List;
Expand Down
Expand Up @@ -16,12 +16,13 @@
* limitations under the License.
*/

package org.apache.hudi.utilities.schema;
package org.apache.hudi.utilities.schema.postprocessor;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;

import org.apache.avro.Schema;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down
Expand Up @@ -16,13 +16,14 @@
* limitations under the License.
*/

package org.apache.hudi.utilities.schema;
package org.apache.hudi.utilities.schema.postprocessor;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;

import org.apache.avro.Schema;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.schema.postprocessor.add;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;

import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;

/**
* A {@link SchemaPostProcessor} used to add a new column of primitive types to given schema. Only supports adding one
* column at a time.
* <p>
* The new column will be appended to the end.
* <p>
* TODO support complex types.
*/
public class AddPrimitiveColumnSchemaPostProcessor extends SchemaPostProcessor {

public AddPrimitiveColumnSchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
}

@Override
public Schema processSchema(Schema schema) {
String newColumnName = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());

if (schema.getField(newColumnName) != null) {
throw new HoodieSchemaPostProcessException(String.format("Column %s already exist!", newColumnName));
}

List<Schema.Field> sourceFields = schema.getFields();
List<Schema.Field> targetFields = new ArrayList<>(sourceFields.size() + 1);


for (Schema.Field sourceField : sourceFields) {
targetFields.add(new Schema.Field(sourceField.name(), sourceField.schema(), sourceField.doc(), sourceField.defaultVal()));
}

// add new column to the end
targetFields.add(buildNewColumn());

return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false, targetFields);
}

private Schema.Field buildNewColumn() {

String columnName = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
String type = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key()).toUpperCase(Locale.ROOT);
String doc = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), null);
Object defaultValue = this.config.getOrDefault(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(),
null);
boolean nullable = this.config.getBoolean(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(),
BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.defaultValue());

ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(columnName));
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(type));
ValidationUtils.checkArgument(!Schema.Type.NULL.getName().equals(type));

Schema newSchema = createSchema(type, nullable);

return new Schema.Field(columnName, newSchema, doc, defaultValue);
}

private Schema createSchema(String type, boolean nullable) {
Schema schema = Schema.create(Schema.Type.valueOf(type));
if (nullable) {
schema = Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
}
return schema;
}

}
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.schema.postprocessor.add;

import org.apache.hudi.common.config.ConfigProperty;

/**
* Base configs to describe a primitive type column.
*/
public class BaseSchemaPostProcessorConfig {

public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.name")
.noDefaultValue()
.withDocumentation("New column's name");

public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.type")
.noDefaultValue()
.withDocumentation("New column's type");

public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.nullable")
.defaultValue(true)
.withDocumentation("New column's nullable");

public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.default")
.noDefaultValue()
.withDocumentation("New column's default value");

public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.doc")
.noDefaultValue()
.withDocumentation("Docs about new column");

}
Expand Up @@ -20,25 +20,32 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor;
import org.apache.hudi.utilities.schema.DropColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.add.BaseSchemaPostProcessorConfig;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.transform.FlatteningTransformer;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

Expand All @@ -55,13 +62,18 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
+ "{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\","
+ "\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"}]}";

private static Stream<Arguments> configParams() {
String[] types = {"bytes", "string", "int", "long", "float", "double", "boolean"};
return Stream.of(types).map(Arguments::of);
}

@Test
public void testPostProcessor() throws IOException {
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName());
SchemaProvider provider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
properties, jsc,null);
UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
properties, jsc, null);

Schema schema = provider.getSourceSchema();
assertEquals(schema.getType(), Type.RECORD);
Expand All @@ -76,9 +88,9 @@ public void testSparkAvro() throws IOException {
transformerClassNames.add(FlatteningTransformer.class.getName());

SchemaProvider provider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
properties, jsc, transformerClassNames);
UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
properties, jsc, transformerClassNames);

Schema schema = provider.getSourceSchema();
assertEquals(schema.getType(), Type.RECORD);
Expand All @@ -99,7 +111,7 @@ public void testDeleteSupport() {
public void testChainedSchemaPostProcessor() {
// DeleteSupportSchemaPostProcessor first, DummySchemaPostProcessor second
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP,
"org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor,org.apache.hudi.utilities.DummySchemaPostProcessor");
"org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor,org.apache.hudi.utilities.DummySchemaPostProcessor");

SchemaPostProcessor processor = UtilHelpers.createSchemaPostProcessor(properties.getString(Config.SCHEMA_POST_PROCESSOR_PROP), properties, jsc);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
Expand All @@ -111,7 +123,7 @@ public void testChainedSchemaPostProcessor() {

// DummySchemaPostProcessor first, DeleteSupportSchemaPostProcessor second
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP,
"org.apache.hudi.utilities.DummySchemaPostProcessor,org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor");
"org.apache.hudi.utilities.DummySchemaPostProcessor,org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor");

processor = UtilHelpers.createSchemaPostProcessor(properties.getString(Config.SCHEMA_POST_PROCESSOR_PROP), properties, jsc);
schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
Expand Down Expand Up @@ -144,6 +156,32 @@ public void testDeleteColumnThrows() {
Assertions.assertThrows(HoodieSchemaPostProcessException.class, () -> processor.processSchema(schema));
}

@ParameterizedTest
@MethodSource("configParams")
public void testAddPrimitiveTypeColumn(String type) {
properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "primitive_column");
properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), type);
properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "primitive column test");

AddPrimitiveColumnSchemaPostProcessor processor = new AddPrimitiveColumnSchemaPostProcessor(properties, null);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
Schema targetSchema = processor.processSchema(schema);

Schema.Field newColumn = targetSchema.getField("primitive_column");

assertNotNull(newColumn);
assertEquals("primitive column test", newColumn.doc());
// nullable by default, so new column is union type
assertNotEquals(type, newColumn.schema().getType().getName());

// test not nullable
properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(), false);
targetSchema = processor.processSchema(schema);
newColumn = targetSchema.getField("primitive_column");
assertEquals(type, newColumn.schema().getType().getName());

}

@Test
public void testSparkAvroSchema() throws IOException {
SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null);
Expand Down

0 comments on commit 25773ca

Please sign in to comment.