Skip to content

Commit

Permalink
[ARCTIC-497] fix When adding cloumn to keyed hive table with partitio…
Browse files Browse the repository at this point in the history
…ns, the scheam sequence of change table and base table is different (#501)

* add HiveChangeInternalTable and BaseSchemaUpdate

* fix code style
  • Loading branch information
hameizi committed Oct 20, 2022
1 parent 33e5b4c commit 5a7c908
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected KeyedHiveTable loadKeyedTable(TableMeta tableMeta) {
fileIO, tableLocation, client, hiveClientPool, false);

Table changeIcebergTable = tableMetaStore.doAs(() -> tables.load(changeLocation));
ChangeTable changeTable = new BaseKeyedTable.ChangeInternalTable(tableIdentifier,
ChangeTable changeTable = new KeyedHiveTable.HiveChangeInternalTable(tableIdentifier,
useArcticTableOperations(changeIcebergTable, changeLocation, fileIO, tableMetaStore.getConfiguration()),
fileIO, client);
return new KeyedHiveTable(tableMeta, tableLocation,
Expand Down Expand Up @@ -303,7 +303,7 @@ protected KeyedHiveTable createKeyedTable(TableMeta meta) {
throw new IllegalStateException("create change table failed", e);
}
});
ChangeTable changeTable = new BaseKeyedTable.ChangeInternalTable(tableIdentifier,
ChangeTable changeTable = new KeyedHiveTable.HiveChangeInternalTable(tableIdentifier,
useArcticTableOperations(changeIcebergTable, changeLocation, fileIO, tableMetaStore.getConfiguration()),
fileIO, client);

Expand Down
156 changes: 156 additions & 0 deletions hive/src/main/java/com/netease/arctic/hive/op/BaseSchemaUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.hive.op;

import com.netease.arctic.table.ArcticTable;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Schema;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import java.util.Collection;

public class BaseSchemaUpdate implements UpdateSchema {

private final ArcticTable arcticTable;
private final UpdateSchema updateSchema;

public BaseSchemaUpdate(ArcticTable arcticTable, UpdateSchema updateSchema) {
this.arcticTable = arcticTable;
this.updateSchema = updateSchema;
}

@Override
public Schema apply() {
return this.updateSchema.apply();
}

@Override
public void commit() {
this.updateSchema.commit();
}

@Override
public UpdateSchema allowIncompatibleChanges() {
throw new UnsupportedOperationException("hive table not support allowIncompatibleChanges");
}

@Override
public UpdateSchema addColumn(String name, Type type, String doc) {
this.updateSchema.addColumn(name, type, doc);
moveColBeforePar(name);
return this;
}

@Override
public UpdateSchema addColumn(String parent, String name, Type type, String doc) {
this.updateSchema.addColumn(parent, name, type, doc);
if (parent == null) {
moveColBeforePar(name);
}
return this;
}

@Override
public UpdateSchema addRequiredColumn(String name, Type type, String doc) {
throw new UnsupportedOperationException("hive table not support addRequiredColumn");
}

@Override
public UpdateSchema addRequiredColumn(String parent, String name, Type type, String doc) {
throw new UnsupportedOperationException("hive table not support addRequiredColumn");
}

@Override
public UpdateSchema renameColumn(String name, String newName) {
throw new UnsupportedOperationException("not support renameColumn now, there will be error when hive stored as " +
"parquet and we rename the column");
}

@Override
public UpdateSchema updateColumn(String name, Type.PrimitiveType newType) {
this.updateSchema.updateColumn(name, newType);
return this;
}

@Override
public UpdateSchema updateColumnDoc(String name, String newDoc) {
this.updateSchema.updateColumnDoc(name, newDoc);
return this;
}

@Override
public UpdateSchema makeColumnOptional(String name) {
throw new UnsupportedOperationException("hive table not support makeColumnOptional");
}

@Override
public UpdateSchema requireColumn(String name) {
throw new UnsupportedOperationException("hive table not support requireColumn");
}

@Override
public UpdateSchema deleteColumn(String name) {
throw new UnsupportedOperationException("hive table not support deleteColumn");
}

@Override
public UpdateSchema moveFirst(String name) {
throw new UnsupportedOperationException("hive table not support moveFirst");
}

@Override
public UpdateSchema moveBefore(String name, String beforeName) {
throw new UnsupportedOperationException("hive table not support moveBefore");
}

@Override
public UpdateSchema moveAfter(String name, String afterName) {
throw new UnsupportedOperationException("hive table not support moveAfter");
}

@Override
public UpdateSchema unionByNameWith(Schema newSchema) {
throw new UnsupportedOperationException("hive table not support unionByNameWith");
}

@Override
public UpdateSchema setIdentifierFields(Collection<String> names) {
throw new UnsupportedOperationException("hive table not support setIdentifierFields");
}

//It is strictly required that all non-partitioned columns precede partitioned columns in the schema.
private void moveColBeforePar(String name) {
if (!arcticTable.spec().isUnpartitioned()) {
int parFieldMinIndex = Integer.MAX_VALUE;
Types.NestedField firstParField = null;
for (PartitionField partitionField : arcticTable.spec().fields()) {
Types.NestedField sourceField = arcticTable.schema().findField(partitionField.sourceId());
if (arcticTable.schema().columns().indexOf(sourceField) < parFieldMinIndex) {
parFieldMinIndex = arcticTable.schema().columns().indexOf(sourceField);
firstParField = sourceField;
}
}
if (firstParField != null) {
this.updateSchema.moveBefore(name, firstParField.name());
}
}
}
}
120 changes: 2 additions & 118 deletions hive/src/main/java/com/netease/arctic/hive/op/HiveSchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,26 @@
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.TableProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Schema;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;

import java.util.Collection;
import java.util.Locale;

/**
* Schema evolution API implementation for {@link KeyedTable}.
*/
public class HiveSchemaUpdate implements UpdateSchema {
public class HiveSchemaUpdate extends BaseSchemaUpdate {
private final ArcticTable arcticTable;
private final HMSClientPool hiveClient;
private final UpdateSchema updateSchema;

public HiveSchemaUpdate(ArcticTable arcticTable, HMSClientPool hiveClient, UpdateSchema updateSchema) {
super(arcticTable, updateSchema);
this.arcticTable = arcticTable;
this.hiveClient = hiveClient;
this.updateSchema = updateSchema;
}

@Override
public Schema apply() {
return this.updateSchema.apply();
}

@Override
public void commit() {
this.updateSchema.commit();
Expand All @@ -72,111 +63,4 @@ private void syncSchemaToHive() {
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT).toUpperCase(Locale.ENGLISH))));
HiveTableUtil.persistTable(hiveClient, tbl);
}

@Override
public UpdateSchema allowIncompatibleChanges() {
throw new UnsupportedOperationException("hive table not support allowIncompatibleChanges");
}

@Override
public UpdateSchema addColumn(String name, Type type, String doc) {
this.updateSchema.addColumn(name, type, doc);
moveColBeforePar(name);
return this;
}

@Override
public UpdateSchema addColumn(String parent, String name, Type type, String doc) {
this.updateSchema.addColumn(parent, name, type, doc);
if (parent == null) {
moveColBeforePar(name);
}
return this;
}

@Override
public UpdateSchema addRequiredColumn(String name, Type type, String doc) {
throw new UnsupportedOperationException("hive table not support addRequiredColumn");
}

@Override
public UpdateSchema addRequiredColumn(String parent, String name, Type type, String doc) {
throw new UnsupportedOperationException("hive table not support addRequiredColumn");
}

@Override
public UpdateSchema renameColumn(String name, String newName) {
throw new UnsupportedOperationException("not support renameColumn now, there will be error when hive stored as " +
"parquet and we rename the column");
}

@Override
public UpdateSchema updateColumn(String name, Type.PrimitiveType newType) {
this.updateSchema.updateColumn(name, newType);
return this;
}

@Override
public UpdateSchema updateColumnDoc(String name, String newDoc) {
this.updateSchema.updateColumnDoc(name, newDoc);
return this;
}

@Override
public UpdateSchema makeColumnOptional(String name) {
throw new UnsupportedOperationException("hive table not support makeColumnOptional");
}

@Override
public UpdateSchema requireColumn(String name) {
throw new UnsupportedOperationException("hive table not support requireColumn");
}

@Override
public UpdateSchema deleteColumn(String name) {
throw new UnsupportedOperationException("hive table not support deleteColumn");
}

@Override
public UpdateSchema moveFirst(String name) {
throw new UnsupportedOperationException("hive table not support moveFirst");
}

@Override
public UpdateSchema moveBefore(String name, String beforeName) {
throw new UnsupportedOperationException("hive table not support moveBefore");
}

@Override
public UpdateSchema moveAfter(String name, String afterName) {
throw new UnsupportedOperationException("hive table not support moveAfter");
}

@Override
public UpdateSchema unionByNameWith(Schema newSchema) {
throw new UnsupportedOperationException("hive table not support unionByNameWith");
}

@Override
public UpdateSchema setIdentifierFields(Collection<String> names) {
throw new UnsupportedOperationException("hive table not support setIdentifierFields");
}

//It is strictly required that all non-partitioned columns precede partitioned columns in the schema.
private void moveColBeforePar(String name) {
if (!arcticTable.spec().isUnpartitioned()) {
int parFieldMinIndex = Integer.MAX_VALUE;
Types.NestedField firstParField = null;
for (PartitionField partitionField : arcticTable.spec().fields()) {
Types.NestedField sourceField = arcticTable.schema().findField(partitionField.sourceId());
if (arcticTable.schema().columns().indexOf(sourceField) < parFieldMinIndex) {
parFieldMinIndex = arcticTable.schema().columns().indexOf(sourceField);
firstParField = sourceField;
}
}
if (firstParField != null) {
this.updateSchema.moveBefore(name, firstParField.name());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@
import com.netease.arctic.ams.api.TableMeta;
import com.netease.arctic.hive.HMSClientPool;
import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.op.BaseSchemaUpdate;
import com.netease.arctic.hive.utils.HiveMetaSynchronizer;
import com.netease.arctic.io.ArcticFileIO;
import com.netease.arctic.table.BaseKeyedTable;
import com.netease.arctic.table.BaseUnkeyedTable;
import com.netease.arctic.table.ChangeTable;
import com.netease.arctic.table.PrimaryKeySpec;
import com.netease.arctic.table.TableIdentifier;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.util.PropertyUtil;

/**
Expand Down Expand Up @@ -84,4 +90,18 @@ private void syncHiveDataToArctic() {
public HMSClientPool getHMSClient() {
return hiveClient;
}

public static class HiveChangeInternalTable extends BaseUnkeyedTable implements ChangeTable {

public HiveChangeInternalTable(
TableIdentifier tableIdentifier, Table changeIcebergTable, ArcticFileIO arcticFileIO,
AmsClient client) {
super(tableIdentifier, changeIcebergTable, arcticFileIO, client);
}

@Override
public UpdateSchema updateSchema() {
return new BaseSchemaUpdate(this, super.updateSchema());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public void testKeyedAdd() throws TException {
}
}
Assert.assertTrue(isExpect);
Assert.assertTrue(compareSchema(testKeyedHiveTable.changeTable().schema(), testKeyedHiveTable.spec(), fieldSchemas));
Assert.assertTrue(compareSchema(testKeyedHiveTable.schema(), testKeyedHiveTable.spec(), fieldSchemas));
}

Expand All @@ -64,6 +65,7 @@ public void testKeyedUpdate() throws TException {
}
}
Assert.assertTrue(isExpect);
Assert.assertTrue(compareSchema(testKeyedHiveTable.changeTable().schema(), testKeyedHiveTable.spec(), fieldSchemas));
Assert.assertTrue(compareSchema(testKeyedHiveTable.schema(), testKeyedHiveTable.spec(), fieldSchemas));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,15 @@ public void testCreateTableUppercase() {
Assert.assertEquals(3, rows.size());
sql("drop table if exists " + database + "." + "uppercase_table");
}

@Test
public void testDeleteAfterAlter() {
sql("alter table {0}.{1} add column point bigint ", database, notUpsertTable);
sql("delete from {0}.{1} where id = 3", database, notUpsertTable);
rows = sql("select id, name from {0}.{1} order by id", database, notUpsertTable);

Assert.assertEquals(2, rows.size());
Assert.assertEquals(1, rows.get(0)[0]);
Assert.assertEquals(2, rows.get(1)[0]);
}
}

0 comments on commit 5a7c908

Please sign in to comment.