Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]Feature optimize support hive #245

Merged
merged 68 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
afe761b
add rewrite to delete old position-delete
Jul 27, 2022
f5acc74
Merge remote-tracking branch 'origin/master'
Jul 27, 2022
0465ead
Merge branch 'NetEase:master' into master
hzluting Jul 27, 2022
78e0714
remove pos-delete record count limit
Jul 27, 2022
81c7130
Merge branch 'NetEase:master' into master
hzluting Jul 28, 2022
c09df14
Merge branch 'NetEase:master' into master
hzluting Aug 1, 2022
a72cf50
Merge branch 'NetEase:master' into master
hzluting Aug 2, 2022
53d495a
Merge branch 'NetEase:master' into master
hzluting Aug 5, 2022
8553792
Merge branch 'NetEase:master' into master
hzluting Aug 5, 2022
780e86c
Merge branch 'NetEase:master' into master
hzluting Aug 8, 2022
3efbcb5
Merge branch 'NetEase:master' into master
hzluting Aug 8, 2022
7385efa
Merge branch 'NetEase:master' into master
hzluting Aug 9, 2022
7e2dc89
Merge branch 'NetEase:master' into master
hzluting Aug 11, 2022
58cbf41
Merge branch 'NetEase:master' into master
hzluting Aug 11, 2022
39854fa
optimize support adapt hive
Aug 11, 2022
64810b8
Merge branch 'NetEase:master' into master
hzluting Aug 12, 2022
a774907
Merge branch 'NetEase:master' into master
hzluting Aug 17, 2022
8fbaf52
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 17, 2022
c6219b7
Merge branch 'NetEase:master' into master
hzluting Aug 17, 2022
53698e8
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 17, 2022
0a238b0
Merge branch 'NetEase:master' into master
hzluting Aug 18, 2022
0f11576
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 18, 2022
44a75c7
Merge branch 'NetEase:master' into master
hzluting Aug 22, 2022
9d7417b
Merge branch 'NetEase:master' into master
hzluting Aug 22, 2022
005cd1b
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 22, 2022
8186643
Merge branch 'NetEase:master' into master
hzluting Aug 22, 2022
c8baec0
Merge branch 'NetEase:master' into master
hzluting Aug 25, 2022
fb854f3
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 25, 2022
a096d66
optimize support hive and add full major optimize
Aug 25, 2022
1b50a27
Merge branch 'NetEase:master' into master
hzluting Aug 25, 2022
ab73aa9
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 25, 2022
f119a20
sync iceberg partition location to hive partition location
Aug 26, 2022
f33f838
Merge branch 'NetEase:master' into master
hzluting Aug 26, 2022
7d9a955
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 26, 2022
281c8a3
Merge branch 'NetEase:master' into master
hzluting Aug 29, 2022
44b86dc
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 29, 2022
146f4e6
adapt unit test to new struct
Aug 29, 2022
a3a5c66
add keyed unPartition table unit test
Aug 29, 2022
dab333c
add keyed unPartition table unit test
Aug 29, 2022
3af897e
Merge branch 'NetEase:master' into master
hzluting Aug 30, 2022
2016b75
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 30, 2022
a699f55
merge hive sync arctic
Aug 30, 2022
56346e6
fix code review
Aug 30, 2022
b79af16
Merge branch 'NetEase:master' into master
hzluting Aug 30, 2022
223e858
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 30, 2022
6216a7c
fix check style
Aug 30, 2022
b3570df
optimize unit test
Aug 30, 2022
a61395f
Merge branch 'NetEase:master' into master
hzluting Aug 31, 2022
6081f44
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 31, 2022
d4c16db
optimize code
Aug 31, 2022
2e0c67a
add latest full optimize time
Aug 31, 2022
da4797d
add update sql
Aug 31, 2022
32e75fb
Merge remote-tracking branch 'origin/feature-optimize-adapt-hive' int…
Aug 31, 2022
e8f0077
fix check style
Aug 31, 2022
0bf2b1f
Merge branch 'NetEase:master' into master
hzluting Aug 31, 2022
1ea79a5
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 31, 2022
46f72fb
Merge branch 'NetEase:master' into master
hzluting Aug 31, 2022
b077792
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 31, 2022
443cf5e
add file clean support hive
Aug 31, 2022
1e62f4e
Merge branch 'NetEase:master' into master
hzluting Aug 31, 2022
b9ef751
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 31, 2022
2dd4540
fix unKeyed Full Major
Aug 31, 2022
d685e49
Merge remote-tracking branch 'origin/feature-optimize-adapt-hive' int…
Aug 31, 2022
8a707c8
fix add twice
Aug 31, 2022
e98ddb6
Merge remote-tracking branch 'origin/feature-optimize-adapt-hive' int…
Aug 31, 2022
04d7d22
Merge branch 'NetEase:master' into master
hzluting Aug 31, 2022
ee7ea45
Merge remote-tracking branch 'origin/master' into feature-optimize-ad…
Aug 31, 2022
ebcd28b
fix check style
Aug 31, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion ams/ams-api/src/main/thrift/arctic_optimize_manager.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ enum JobType {

enum OptimizeType {
Minor,
Major
Major,
FullMajor
}


Expand Down
33 changes: 15 additions & 18 deletions ams/ams-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,6 @@
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.netease.arctic</groupId>-->
<!-- <artifactId>arctic-hive</artifactId>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-client</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-yarn-api</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <artifactId>libthrift</artifactId>-->
<!-- <groupId>org.apache.thrift</groupId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->

<dependency>
<groupId>com.netease.arctic</groupId>
Expand All @@ -104,6 +86,13 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.netease.arctic</groupId>
<artifactId>arctic-hive</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
Expand Down Expand Up @@ -227,6 +216,14 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.netease.arctic</groupId>
<artifactId>arctic-hive</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.netease.arctic</groupId>
<artifactId>arctic-ams-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ private static void startMetaStoreThreads(
startOptimizeCommit(conf.getInteger(ArcticMetaStoreConf.OPTIMIZE_COMMIT_THREAD_POOL_SIZE));
startExpiredClean();
startOrphanClean();
startSupportHiveSync();
monitorOptimizerStatus();
tableRuntimeDataExpire();
AmsRestServer.startRestServer(httpPort);
Expand Down Expand Up @@ -245,6 +246,14 @@ private static void startOrphanClean() {
TimeUnit.MILLISECONDS);
}

private static void startSupportHiveSync() {
ThreadPool.getPool(ThreadPool.Type.HIVE_SYNC).scheduleWithFixedDelay(
ServiceContainer.getSupportHiveSyncService()::checkHiveSyncTasks,
3 * 1000L,
60 * 1000L,
TimeUnit.MILLISECONDS);
}

private static void monitorOptimizerStatus() {
OptimizeExecuteService.OptimizerMonitor monitor = new OptimizeExecuteService.OptimizerMonitor();
ThreadPool.getPool(ThreadPool.Type.OPTIMIZER_MONITOR).scheduleWithFixedDelay(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public class ArcticMetaStoreConf {
.defaultValue(10)
.withDescription("Number of threads in the thread pool. " +
"These will be used to execute all orphan file clean processes.");
public static final ConfigOption<Integer> SUPPORT_HIVE_SYNC_THREAD_POOL_SIZE =
ConfigOptions.key("arctic.ams.support.hive.sync.thread.pool-size")
.intType()
.defaultValue(10)
.withDescription("Number of threads in the thread pool. " +
"These will be used to execute all support hive sync processes.");
public static final ConfigOption<Integer> SYNC_FILE_INFO_CACHE_THREAD_POOL_SIZE =
ConfigOptions.key("arctic.ams.file.sync.thread.pool-size")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface OptimizeTasksMapper {

@Select("select trace_id, optimize_type, catalog_name, db_name, table_name, `partition`," +
" task_group, task_history_id, max_change_transaction_id, is_delete_pos_delete," +
" source_nodes, create_time, properties, queue_id, " +
" source_nodes, create_time, properties, queue_id," +
" insert_file_size, delete_file_size, base_file_size, pos_delete_file_size," +
" insert_files, delete_files, base_files, pos_delete_files" +
" from " + TABLE_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class BaseOptimizeTask extends OptimizeTask {
protected long createTime;

private long maxChangeTransactionId = INVALID_TRANSACTION_ID;
@Deprecated
private int isDeletePosDelete;

public BaseOptimizeTask() {
Expand Down Expand Up @@ -70,7 +71,6 @@ public void setMaxChangeTransactionId(long maxChangeTransactionId) {
this.maxChangeTransactionId = maxChangeTransactionId;
}


public String getPartition() {
return partition;
}
Expand Down Expand Up @@ -143,10 +143,12 @@ public void setDeleteFileCnt(int deleteFileCnt) {
this.deleteFileCnt = deleteFileCnt;
}

@Deprecated
public int getIsDeletePosDelete() {
return isDeletePosDelete;
}

@Deprecated
public void setIsDeletePosDelete(int isDeletePosDelete) {
this.isDeletePosDelete = isDeletePosDelete;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,20 @@ public void collectDeleteFiles(List<DataFile> collector) {
}

public void collectBaseFiles(List<DataFile> collector) {
collectBaseFiles(collector, false, 0);
collectBaseFiles(collector, false, Collections.emptyList());
}

public void collectBaseFiles(List<DataFile> collector, boolean isFilterSmallFile, long smallFileSize) {
if (isFilterSmallFile) {
public void collectBaseFiles(List<DataFile> collector, boolean isMajor, List<DataFile> needOptimizeFiles) {
if (isMajor) {
baseFiles = baseFiles.stream()
.filter(dataFile -> dataFile.fileSizeInBytes() < smallFileSize).collect(Collectors.toList());
.filter(needOptimizeFiles::contains).collect(Collectors.toList());
}
collector.addAll(baseFiles);
if (left != null) {
left.collectBaseFiles(collector, isFilterSmallFile, smallFileSize);
left.collectBaseFiles(collector, isMajor, needOptimizeFiles);
}
if (right != null) {
right.collectBaseFiles(collector, isFilterSmallFile, smallFileSize);
right.collectBaseFiles(collector, isMajor, needOptimizeFiles);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public class TableOptimizeRuntime {
private long currentChangeSnapshotId = INVALID_SNAPSHOT_ID;
private TableOptimizeInfo.OptimizeStatus optimizeStatus = TableOptimizeInfo.OptimizeStatus.Idle;
private long optimizeStatusStartTime = -1;

private final Map<String, Long> latestMajorOptimizeTime = new HashMap<>();
private final Map<String, Long> latestFullMinorOptimizeTime = new HashMap<>();
private final Map<String, Long> latestMinorOptimizeTime = new HashMap<>();
private String latestTaskHistoryId;
private volatile boolean isRunning;
Expand Down Expand Up @@ -77,6 +79,15 @@ public void putLatestMajorOptimizeTime(String partition, long time) {
}
}

public void putLatestFullMajorOptimizeTime(String partition, long time) {
Long oldValue = latestFullMinorOptimizeTime.putIfAbsent(partition, time);
if (oldValue != null) {
if (time > oldValue) {
latestFullMinorOptimizeTime.put(partition, time);
}
}
}

public TableOptimizeInfo.OptimizeStatus getOptimizeStatus() {
return optimizeStatus;
}
Expand All @@ -99,6 +110,11 @@ public long getLatestMajorOptimizeTime(String partition) {
return time == null ? -1 : time;
}

public long getLatestFullMajorOptimizeTime(String partition) {
Long time = latestFullMinorOptimizeTime.get(partition);
return time == null ? -1 : time;
}

public void putLatestMinorOptimizeTime(String partition, long time) {
Long oldValue = latestMinorOptimizeTime.putIfAbsent(partition, time);
if (oldValue != null) {
Expand All @@ -118,6 +134,9 @@ public Set<String> getPartitions() {
if (MapUtils.isNotEmpty(latestMajorOptimizeTime)) {
result.addAll(latestMajorOptimizeTime.keySet());
}
if (MapUtils.isNotEmpty(latestFullMinorOptimizeTime)) {
result.addAll(latestFullMinorOptimizeTime.keySet());
}
if (MapUtils.isNotEmpty(latestMinorOptimizeTime)) {
result.addAll(latestMinorOptimizeTime.keySet());
}
Expand Down Expand Up @@ -158,6 +177,7 @@ public String toString() {
", optimizeStatus=" + optimizeStatus +
", optimizeStatusStartTime=" + optimizeStatusStartTime +
", latestMajorOptimizeTime=" + latestMajorOptimizeTime +
", latestFullMinorOptimizeTime=" + latestFullMinorOptimizeTime +
", latestMinorOptimizeTime=" + latestMinorOptimizeTime +
", latestTaskHistoryId='" + latestTaskHistoryId + '\'' +
", isRunning=" + isRunning +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public class TaskConfig {
private final String historyId;
private final long createTime;

public TaskConfig(String partition, @Nullable Long maxTransactionId, String group, String historyId,
public TaskConfig(String partition, @Nullable Long maxTransactionId,
String group, String historyId,
OptimizeType optimizeType, long createTime) {
this.optimizeType = optimizeType;
this.partition = partition;
Expand All @@ -49,6 +50,7 @@ public String getPartition() {
return partition;
}

@org.jetbrains.annotations.Nullable
public Long getMaxTransactionId() {
return maxTransactionId;
}
Expand Down
Loading