Skip to content

Commit

Permalink
shutdown this workerPool in the finally block
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangbutao committed May 23, 2024
1 parent ab5003f commit ef6d684
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,25 @@ public List<InputSplit> getSplits(JobContext context) {
conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tbl.name(), SerializationUtil.serializeToBase64(tbl));
return tbl;
});
final ExecutorService workerPool =
ThreadPools.newWorkerPool("iceberg-plan-worker-pool",
conf.getInt(InputFormatConfig.TABLE_PLAN_WORKER_POOL_SIZE, ThreadPools.WORKER_THREAD_POOL_SIZE));
try {
return planInputSplits(table, conf, workerPool);
} finally {
workerPool.shutdown();
}

}

private List<InputSplit> planInputSplits(Table table, Configuration conf, ExecutorService workerPool) {
List<InputSplit> splits = Lists.newArrayList();
boolean applyResidual = !conf.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
InputFormatConfig.InMemoryDataModel model = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);

long fromVersion = conf.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1);
Scan<?, FileScanTask, CombinedScanTask> scan;
final ExecutorService workerPool =
ThreadPools.newWorkerPool("iceberg-plan-worker-pool",
conf.getInt(InputFormatConfig.TABLE_PLAN_WORKER_POOL_SIZE, ThreadPools.WORKER_THREAD_POOL_SIZE));
if (fromVersion != -1) {
scan = applyConfig(conf, createIncrementalAppendScan(table, conf)).planWith(workerPool);
} else {
Expand Down

0 comments on commit ef6d684

Please sign in to comment.