Skip to content

Commit

Permalink
[AMORO-2404] fix Mixed Hive table mistakenly deletes hive files durin…
Browse files Browse the repository at this point in the history
…g expiring snapshots (apache#2405)

get hive locations return the uri path
  • Loading branch information
wangtaohz authored and ShawHee committed Dec 29, 2023
1 parent 894018c commit ba09fdb
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,15 @@
import com.netease.arctic.hive.table.SupportHive;
import com.netease.arctic.hive.utils.TableTypeUtil;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.utils.TableFileUtil;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class HiveLocationUtil {
private static final Logger LOG = LoggerFactory.getLogger(HiveLocationUtil.class);

/**
* Get table hive table/partition location
*
Expand All @@ -50,7 +47,7 @@ public static Set<String> getHiveLocation(ArcticTable table) {
.run(
client ->
client.getTable(table.id().getDatabase(), table.id().getTableName()));
hiveLocations.add(hiveTable.getSd().getLocation());
hiveLocations.add(TableFileUtil.getUriPath(hiveTable.getSd().getLocation()));
} catch (Exception e) {
throw new IllegalStateException("Failed to get hive table location", e);
}
Expand All @@ -66,7 +63,7 @@ public static Set<String> getHiveLocation(ArcticTable table) {
table.id().getTableName(),
Short.MAX_VALUE));
for (Partition partition : partitions) {
hiveLocations.add(partition.getSd().getLocation());
hiveLocations.add(TableFileUtil.getUriPath(partition.getSd().getLocation()));
}
} catch (Exception e) {
throw new IllegalStateException("Failed to get hive partition locations", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,24 @@
import com.netease.arctic.hive.TestHMS;
import com.netease.arctic.hive.catalog.HiveCatalogTestHelper;
import com.netease.arctic.hive.catalog.HiveTableTestHelper;
import com.netease.arctic.hive.io.HiveDataTestHelpers;
import com.netease.arctic.hive.utils.HiveTableUtil;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.TableFileUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

@RunWith(Parameterized.class)
public class TestSnapshotExpireHive extends TestSnapshotExpire {
Expand Down Expand Up @@ -74,8 +77,8 @@ public TestSnapshotExpireHive(

@Test
public void testExpireTableFiles() {
List<DataFile> hiveFiles = writeAndCommitBaseAndHive(getArcticTable(), 1, true);
List<DataFile> s2Files = writeAndCommitBaseAndHive(getArcticTable(), 1, false);
List<DataFile> hiveFiles = writeAndReplaceHivePartitions(getArcticTable());
List<DataFile> s2Files = writeAndCommitBaseStore(getArcticTable());

DeleteFiles deleteHiveFiles =
isKeyedTable()
Expand Down Expand Up @@ -104,27 +107,16 @@ public void testExpireTableFiles() {
}
deleteIcebergFiles.commit();

List<DataFile> s3Files = writeAndCommitBaseAndHive(getArcticTable(), 1, false);
List<DataFile> s3Files = writeAndCommitBaseStore(getArcticTable());
s3Files.forEach(
file -> Assert.assertTrue(getArcticTable().io().exists(file.path().toString())));

Set<String> hiveLocation = new HashSet<>();
String partitionHiveLocation = hiveFiles.get(0).path().toString();
hiveLocation.add(TableFileUtil.getUriPath(TableFileUtil.getFileDir(partitionHiveLocation)));
if (isPartitionedTable()) {
String anotherHiveLocation =
partitionHiveLocation.contains("op_time_day=2022-01-01")
? partitionHiveLocation.replace("op_time_day=2022-01-01", "op_time_day=2022-01-02")
: partitionHiveLocation.replace("op_time_day=2022-01-02", "op_time_day=2022-01-01");
hiveLocation.add(TableFileUtil.getUriPath(TableFileUtil.getFileDir(anotherHiveLocation)));
}
UnkeyedTable unkeyedTable =
isKeyedTable()
? getArcticTable().asKeyedTable().baseTable()
: getArcticTable().asUnkeyedTable();
new MixedTableMaintainer(unkeyedTable)
.getBaseMaintainer()
.expireSnapshots(System.currentTimeMillis(), hiveLocation);
MixedTableMaintainer mixedTableMaintainer = new MixedTableMaintainer(getArcticTable());
mixedTableMaintainer.getBaseMaintainer().expireSnapshots(System.currentTimeMillis());
Assert.assertEquals(1, Iterables.size(unkeyedTable.snapshots()));

hiveFiles.forEach(
Expand All @@ -134,4 +126,22 @@ public void testExpireTableFiles() {
s3Files.forEach(
file -> Assert.assertTrue(getArcticTable().io().exists(file.path().toString())));
}

public List<DataFile> writeAndReplaceHivePartitions(ArcticTable table) {
String hiveSubDir = HiveTableUtil.newHiveSubdirectory();
HiveDataTestHelpers.WriterHelper writerHelper =
HiveDataTestHelpers.writerOf(table).customHiveLocation(hiveSubDir).transactionId(0L);
List<Record> records = createRecords(1, 100);
List<DataFile> dataFiles;
dataFiles = writerHelper.writeHive(records);

// Using replace partitions to alter hive table or partitions to new location
UnkeyedTable baseTable =
table.isKeyedTable() ? table.asKeyedTable().baseTable() : table.asUnkeyedTable();
ReplacePartitions replacePartitions = baseTable.newReplacePartitions();
dataFiles.forEach(replacePartitions::addFile);
replacePartitions.commit();
// The dataFiles have the prefix '.' in file name, but it will be removed after commit
return Lists.newArrayList(baseTable.currentSnapshot().addedDataFiles(table.io()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import com.netease.arctic.catalog.CatalogTestHelper;
import com.netease.arctic.catalog.TableTestBase;
import com.netease.arctic.data.ChangeAction;
import com.netease.arctic.hive.io.HiveDataTestHelpers;
import com.netease.arctic.hive.utils.HiveTableUtil;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.UnkeyedTable;
Expand Down Expand Up @@ -78,26 +76,6 @@ public List<DataFile> writeAndCommitChangeStore(
return writeFiles;
}

public List<DataFile> writeAndCommitBaseAndHive(ArcticTable table, long txId, boolean writeHive) {
String hiveSubDir = HiveTableUtil.newHiveSubdirectory(txId);
HiveDataTestHelpers.WriterHelper writerHelper =
HiveDataTestHelpers.writerOf(table).customHiveLocation(hiveSubDir).transactionId(txId);
List<Record> records = createRecords(1, 100);
List<DataFile> dataFiles;
if (writeHive) {
dataFiles = writerHelper.writeHive(records);
} else {
dataFiles = writerHelper.writeBase(records);
}

UnkeyedTable baseTable =
table.isKeyedTable() ? table.asKeyedTable().baseTable() : table.asUnkeyedTable();
AppendFiles baseAppend = baseTable.newAppend();
dataFiles.forEach(baseAppend::appendFile);
baseAppend.commit();
return dataFiles;
}

public void writeAndCommitBaseAndChange(ArcticTable table) {
writeAndCommitBaseStore(table);
writeAndCommitChangeStore(
Expand Down

0 comments on commit ba09fdb

Please sign in to comment.