Skip to content

Commit

Permalink
[WIP]Feature optimize support hive (#245)
Browse files Browse the repository at this point in the history
* add rewrite to delete old position-delete

* remove pos-delete record count limit

* optimize support adapt hive

* optimize support hive and add full major optimize

* sync iceberg partition location to hive partition location

* adapt unit test to new struct

* add keyed unPartition table unit test

* add keyed unPartition table unit test

* merge hive sync arctic

* fix code review

* fix check style

* optimize unit test

* optimize code

* add latest full optimize time

* add update sql

* fix check style

* add file clean support hive

* fix unKeyed Full Major

* fix add twice

* fix check style

Co-authored-by: luting <ting.lt@dtwave-inc.com>
  • Loading branch information
hzluting and luting committed Sep 1, 2022
1 parent a16f015 commit f0e6611
Show file tree
Hide file tree
Showing 67 changed files with 3,584 additions and 623 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion ams/ams-api/src/main/thrift/arctic_optimize_manager.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ enum JobType {

enum OptimizeType {
Minor,
Major
Major,
FullMajor
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ private static void startMetaStoreThreads(
startOptimizeCommit(conf.getInteger(ArcticMetaStoreConf.OPTIMIZE_COMMIT_THREAD_POOL_SIZE));
startExpiredClean();
startOrphanClean();
startSupportHiveSync();
monitorOptimizerStatus();
tableRuntimeDataExpire();
AmsRestServer.startRestServer(httpPort);
Expand Down Expand Up @@ -302,6 +303,14 @@ private static void startOrphanClean() {
TimeUnit.MILLISECONDS);
}

private static void startSupportHiveSync() {
ThreadPool.getPool(ThreadPool.Type.HIVE_SYNC).scheduleWithFixedDelay(
ServiceContainer.getSupportHiveSyncService()::checkHiveSyncTasks,
3 * 1000L,
60 * 1000L,
TimeUnit.MILLISECONDS);
}

private static void monitorOptimizerStatus() {
OptimizeExecuteService.OptimizerMonitor monitor = new OptimizeExecuteService.OptimizerMonitor();
ThreadPool.getPool(ThreadPool.Type.OPTIMIZER_MONITOR).scheduleWithFixedDelay(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public class ArcticMetaStoreConf {
.defaultValue(10)
.withDescription("Number of threads in the thread pool. " +
"These will be used to execute all orphan file clean processes.");
public static final ConfigOption<Integer> SUPPORT_HIVE_SYNC_THREAD_POOL_SIZE =
ConfigOptions.key("arctic.ams.support.hive.sync.thread.pool-size")
.intType()
.defaultValue(10)
.withDescription("Number of threads in the thread pool. " +
"These will be used to execute all support hive sync processes.");
public static final ConfigOption<Integer> SYNC_FILE_INFO_CACHE_THREAD_POOL_SIZE =
ConfigOptions.key("arctic.ams.file.sync.thread.pool-size")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface OptimizeTasksMapper {

@Select("select trace_id, optimize_type, catalog_name, db_name, table_name, `partition`," +
" task_group, task_history_id, max_change_transaction_id, is_delete_pos_delete," +
" source_nodes, create_time, properties, queue_id, " +
" source_nodes, create_time, properties, queue_id," +
" insert_file_size, delete_file_size, base_file_size, pos_delete_file_size," +
" insert_files, delete_files, base_files, pos_delete_files" +
" from " + TABLE_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface TableOptimizeRuntimeMapper {
String TABLE_NAME = "optimize_table_runtime";

@Select("select catalog_name, db_name, table_name, current_snapshot_id, current_change_snapshotId," +
" latest_major_optimize_time, latest_minor_optimize_time, latest_task_history_id," +
" latest_major_optimize_time, latest_full_optimize_time, latest_minor_optimize_time, latest_task_history_id," +
" optimize_status, optimize_status_start_time " +
" from " + TABLE_NAME)
@ConstructorArgs({
Expand All @@ -51,6 +51,8 @@ public interface TableOptimizeRuntimeMapper {
@Result(property = "currentChangeSnapshotId", column = "current_change_snapshotId"),
@Result(property = "latestMajorOptimizeTime", column = "latest_major_optimize_time",
typeHandler = MapLong2StringConverter.class),
@Result(property = "latestFullOptimizeTime", column = "latest_full_optimize_time",
typeHandler = MapLong2StringConverter.class),
@Result(property = "latestMinorOptimizeTime", column = "latest_minor_optimize_time",
typeHandler = MapLong2StringConverter.class),
@Result(property = "latestTaskHistoryId", column = "latest_task_history_id"),
Expand All @@ -65,6 +67,8 @@ public interface TableOptimizeRuntimeMapper {
"current_change_snapshotId = #{runtime.currentChangeSnapshotId}, " +
"latest_major_optimize_time = #{runtime.latestMajorOptimizeTime, " +
"typeHandler=com.netease.arctic.ams.server.mybatis.MapLong2StringConverter}, " +
"latest_full_optimize_time = #{runtime.latestFullOptimizeTime, " +
"typeHandler=com.netease.arctic.ams.server.mybatis.MapLong2StringConverter}, " +
"latest_minor_optimize_time = #{runtime.latestMinorOptimizeTime, " +
"typeHandler=com.netease.arctic.ams.server.mybatis.MapLong2StringConverter}, " +
"latest_task_history_id = #{runtime.latestTaskHistoryId, jdbcType=VARCHAR}, " +
Expand All @@ -84,15 +88,17 @@ public interface TableOptimizeRuntimeMapper {
void deleteTableOptimizeRuntime(@Param("tableIdentifier") TableIdentifier tableIdentifier);

@Insert("insert into " + TABLE_NAME + " (catalog_name, db_name, table_name, " +
"current_snapshot_id, current_change_snapshotId, latest_major_optimize_time, latest_minor_optimize_time, " +
"latest_task_history_id, optimize_status, optimize_status_start_time) values ( " +
"current_snapshot_id, current_change_snapshotId, latest_major_optimize_time, latest_full_optimize_time, " +
"latest_minor_optimize_time, latest_task_history_id, optimize_status, optimize_status_start_time) values ( " +
"#{runtime.tableIdentifier.catalog}, " +
"#{runtime.tableIdentifier.database}, " +
"#{runtime.tableIdentifier.tableName}, " +
"#{runtime.currentSnapshotId}, " +
"#{runtime.currentChangeSnapshotId}, " +
"#{runtime.latestMajorOptimizeTime, " +
"typeHandler=com.netease.arctic.ams.server.mybatis.MapLong2StringConverter}," +
"#{runtime.latestFullOptimizeTime, " +
"typeHandler=com.netease.arctic.ams.server.mybatis.MapLong2StringConverter}," +
"#{runtime.latestMinorOptimizeTime, " +
"typeHandler=com.netease.arctic.ams.server.mybatis.MapLong2StringConverter}," +
"#{runtime.latestTaskHistoryId, jdbcType=VARCHAR}," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class BaseOptimizeTask extends OptimizeTask {
protected long createTime;

private long maxChangeTransactionId = INVALID_TRANSACTION_ID;
@Deprecated
private int isDeletePosDelete;

public BaseOptimizeTask() {
Expand Down Expand Up @@ -70,7 +71,6 @@ public void setMaxChangeTransactionId(long maxChangeTransactionId) {
this.maxChangeTransactionId = maxChangeTransactionId;
}


public String getPartition() {
return partition;
}
Expand Down Expand Up @@ -143,10 +143,12 @@ public void setDeleteFileCnt(int deleteFileCnt) {
this.deleteFileCnt = deleteFileCnt;
}

@Deprecated
public int getIsDeletePosDelete() {
return isDeletePosDelete;
}

@Deprecated
public void setIsDeletePosDelete(int isDeletePosDelete) {
this.isDeletePosDelete = isDeletePosDelete;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,20 @@ public void collectDeleteFiles(List<DataFile> collector) {
}

public void collectBaseFiles(List<DataFile> collector) {
collectBaseFiles(collector, false, 0);
collectBaseFiles(collector, false, Collections.emptyList());
}

public void collectBaseFiles(List<DataFile> collector, boolean isFilterSmallFile, long smallFileSize) {
if (isFilterSmallFile) {
public void collectBaseFiles(List<DataFile> collector, boolean isMajor, List<DataFile> needOptimizeFiles) {
if (isMajor) {
baseFiles = baseFiles.stream()
.filter(dataFile -> dataFile.fileSizeInBytes() < smallFileSize).collect(Collectors.toList());
.filter(needOptimizeFiles::contains).collect(Collectors.toList());
}
collector.addAll(baseFiles);
if (left != null) {
left.collectBaseFiles(collector, isFilterSmallFile, smallFileSize);
left.collectBaseFiles(collector, isMajor, needOptimizeFiles);
}
if (right != null) {
right.collectBaseFiles(collector, isFilterSmallFile, smallFileSize);
right.collectBaseFiles(collector, isMajor, needOptimizeFiles);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public class TableOptimizeRuntime {
private long currentChangeSnapshotId = INVALID_SNAPSHOT_ID;
private TableOptimizeInfo.OptimizeStatus optimizeStatus = TableOptimizeInfo.OptimizeStatus.Idle;
private long optimizeStatusStartTime = -1;

private final Map<String, Long> latestMajorOptimizeTime = new HashMap<>();
private final Map<String, Long> latestFullOptimizeTime = new HashMap<>();
private final Map<String, Long> latestMinorOptimizeTime = new HashMap<>();
private String latestTaskHistoryId;
private volatile boolean isRunning;
Expand Down Expand Up @@ -77,6 +79,15 @@ public void putLatestMajorOptimizeTime(String partition, long time) {
}
}

public void putLatestFullOptimizeTime(String partition, long time) {
Long oldValue = latestFullOptimizeTime.putIfAbsent(partition, time);
if (oldValue != null) {
if (time > oldValue) {
latestFullOptimizeTime.put(partition, time);
}
}
}

public TableOptimizeInfo.OptimizeStatus getOptimizeStatus() {
return optimizeStatus;
}
Expand All @@ -99,6 +110,11 @@ public long getLatestMajorOptimizeTime(String partition) {
return time == null ? -1 : time;
}

public long getLatestFullOptimizeTime(String partition) {
Long time = latestFullOptimizeTime.get(partition);
return time == null ? -1 : time;
}

public void putLatestMinorOptimizeTime(String partition, long time) {
Long oldValue = latestMinorOptimizeTime.putIfAbsent(partition, time);
if (oldValue != null) {
Expand All @@ -118,6 +134,9 @@ public Set<String> getPartitions() {
if (MapUtils.isNotEmpty(latestMajorOptimizeTime)) {
result.addAll(latestMajorOptimizeTime.keySet());
}
if (MapUtils.isNotEmpty(latestFullOptimizeTime)) {
result.addAll(latestFullOptimizeTime.keySet());
}
if (MapUtils.isNotEmpty(latestMinorOptimizeTime)) {
result.addAll(latestMinorOptimizeTime.keySet());
}
Expand Down Expand Up @@ -158,6 +177,7 @@ public String toString() {
", optimizeStatus=" + optimizeStatus +
", optimizeStatusStartTime=" + optimizeStatusStartTime +
", latestMajorOptimizeTime=" + latestMajorOptimizeTime +
", latestFullOptimizeTime=" + latestFullOptimizeTime +
", latestMinorOptimizeTime=" + latestMinorOptimizeTime +
", latestTaskHistoryId='" + latestTaskHistoryId + '\'' +
", isRunning=" + isRunning +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public class TaskConfig {
private final String historyId;
private final long createTime;

public TaskConfig(String partition, @Nullable Long maxTransactionId, String group, String historyId,
public TaskConfig(String partition, @Nullable Long maxTransactionId,
String group, String historyId,
OptimizeType optimizeType, long createTime) {
this.optimizeType = optimizeType;
this.partition = partition;
Expand All @@ -49,6 +50,7 @@ public String getPartition() {
return partition;
}

@org.jetbrains.annotations.Nullable
public Long getMaxTransactionId() {
return maxTransactionId;
}
Expand Down
Loading

0 comments on commit f0e6611

Please sign in to comment.