Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class ExternalMetaCacheMgr {
private ExecutorService rowCountRefreshExecutor;
private ExecutorService commonRefreshExecutor;
private ExecutorService fileListingExecutor;
// This executor is used to schedule the getting split tasks
private ExecutorService scheduleExecutor;

// catalog id -> HiveMetaStoreCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -76,20 +77,28 @@ public SplitAssignment(
public void init() throws UserException {
splitGenerator.startSplit(backendPolicy.numBackends());
synchronized (assignLock) {
while (sampleSplit == null && waitFirstSplit()) {
final int waitIntervalTimeMillis = 100;
final int initTimeoutMillis = 30000; // 30s
int waitTotalTime = 0;
while (sampleSplit == null && needMoreSplit()) {
try {
assignLock.wait(100);
assignLock.wait(waitIntervalTimeMillis);
} catch (InterruptedException e) {
throw new UserException(e.getMessage(), e);
}
waitTotalTime += waitIntervalTimeMillis;
if (waitTotalTime > initTimeoutMillis) {
throw new UserException("Failed to get first split after waiting for "
+ (waitTotalTime / 1000) + " seconds.");
}
}
}
if (exception != null) {
throw exception;
}
}

private boolean waitFirstSplit() {
public boolean needMoreSplit() {
return !scheduleFinished.get() && !isStopped.get() && exception == null;
}

Expand All @@ -100,10 +109,16 @@ private void appendBatch(Multimap<Backend, Split> batch) throws UserException {
for (Split split : splits) {
locations.add(splitToScanRange.getScanRange(backend, locationProperties, split, pathPartitionKeys));
}
try {
assignment.computeIfAbsent(backend, be -> new LinkedBlockingQueue<>(10000)).put(locations);
} catch (Exception e) {
throw new UserException("Failed to offer batch split", e);
while (needMoreSplit()) {
BlockingQueue<Collection<TScanRangeLocations>> queue =
assignment.computeIfAbsent(backend, be -> new LinkedBlockingQueue<>(10000));
try {
if (queue.offer(locations, 100, TimeUnit.MILLISECONDS)) {
return;
}
} catch (InterruptedException e) {
addUserException(new UserException("Failed to offer batch split by interrupted", e));
}
}
}
}
Expand All @@ -120,7 +135,7 @@ public Split getSampleSplit() {
return sampleSplit;
}

public void addToQueue(List<Split> splits) {
public void addToQueue(List<Split> splits) throws UserException {
if (splits.isEmpty()) {
return;
}
Expand All @@ -130,19 +145,9 @@ public void addToQueue(List<Split> splits) {
sampleSplit = splits.get(0);
assignLock.notify();
}
try {
batch = backendPolicy.computeScanRangeAssignment(splits);
} catch (UserException e) {
exception = e;
}
}
if (batch != null) {
try {
appendBatch(batch);
} catch (UserException e) {
exception = e;
}
batch = backendPolicy.computeScanRangeAssignment(splits);
}
appendBatch(batch);
}

private void notifyAssignment() {
Expand All @@ -164,10 +169,18 @@ public BlockingQueue<Collection<TScanRangeLocations>> getAssignedSplits(Backend
}

public void setException(UserException e) {
exception = e;
addUserException(e);
notifyAssignment();
}

private void addUserException(UserException e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}

public void finishSchedule() {
scheduleFinished.set(true);
notifyAssignment();
Expand All @@ -187,6 +200,9 @@ public void stop() {
}
});
notifyAssignment();
if (exception != null) {
throw new RuntimeException(exception);
}
}

public boolean isStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ protected void initLocalObjectsImpl() {
}
HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
Integer.MAX_VALUE,
String.format("hms_iceberg_catalog_%s_executor_pool", name),
true,
preExecutionAuthenticator);
ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
Integer.MAX_VALUE,
String.format("hms_iceberg_catalog_%s_executor_pool", name),
true,
preExecutionAuthenticator);
FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
this.bindBrokerName(), this.catalogProperty.getHadoopProperties());
this.fileSystemExecutor = ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ public void startSplit(int numBackends) {
if (allFiles.size() > numSplitsPerPartition.get()) {
numSplitsPerPartition.set(allFiles.size());
}
splitAssignment.addToQueue(allFiles);
if (splitAssignment.needMoreSplit()) {
splitAssignment.addToQueue(allFiles);
}
} catch (Exception e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -417,6 +418,7 @@ public void startSplit(int numBackends) {
return;
}
AtomicInteger numFinishedPartitions = new AtomicInteger(0);
ExecutorService scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
CompletableFuture.runAsync(() -> {
for (HivePartition partition : prunedPartitions) {
if (batchException.get() != null || splitAssignment.isStop()) {
Expand All @@ -435,8 +437,10 @@ public void startSplit(int numBackends) {
if (allFiles.size() > numSplitsPerPartition.get()) {
numSplitsPerPartition.set(allFiles.size());
}
splitAssignment.addToQueue(allFiles);
} catch (IOException e) {
if (splitAssignment.needMoreSplit()) {
splitAssignment.addToQueue(allFiles);
}
} catch (Exception e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
splittersOnFlight.release();
Expand All @@ -447,12 +451,12 @@ public void startSplit(int numBackends) {
splitAssignment.finishSchedule();
}
}
});
}, scheduleExecutor);
}
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
});
}, scheduleExecutor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -78,6 +79,7 @@
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

public class IcebergScanNode extends FileQueryScanNode {

Expand Down Expand Up @@ -229,19 +231,21 @@ public void startSplit(int numBackends) throws UserException {
public void doStartSplit() {
TableScan scan = createTableScan();
CompletableFuture.runAsync(() -> {
AtomicReference<CloseableIterable<FileScanTask>> taskRef = new AtomicReference<>();
try {
preExecutionAuthenticator.execute(
() -> {
CloseableIterable<FileScanTask> fileScanTasks = planFileScanTask(scan);

// 1. this task should stop when all splits are assigned
// 2. if we want to stop this plan, we can close the fileScanTasks to stop
splitAssignment.addCloseable(fileScanTasks);

fileScanTasks.forEach(fileScanTask ->
splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(fileScanTask))));

return null;
taskRef.set(fileScanTasks);

CloseableIterator<FileScanTask> iterator = fileScanTasks.iterator();
while (splitAssignment.needMoreSplit() && iterator.hasNext()) {
try {
splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(iterator.next())));
} catch (UserException e) {
throw new RuntimeException(e);
}
}
}
);
splitAssignment.finishSchedule();
Expand All @@ -252,8 +256,16 @@ public void doStartSplit() {
} else {
splitAssignment.setException(new UserException(e.getMessage(), e));
}
} finally {
if (taskRef.get() != null) {
try {
taskRef.get().close();
} catch (IOException e) {
// ignore
}
}
}
});
}, Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,10 @@ public void startSplit(int numBackends) {
createTableBatchReadSession(requiredBatchPartitionSpecs);
List<Split> batchSplit = getSplitByTableSession(tableBatchReadSession);

splitAssignment.addToQueue(batchSplit);
} catch (IOException e) {
if (splitAssignment.needMoreSplit()) {
splitAssignment.addToQueue(batchSplit);
}
} catch (Exception e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
if (batchException.get() != null) {
Expand All @@ -288,7 +290,7 @@ public void startSplit(int numBackends) {
splitAssignment.setException(batchException.get());
}
}
});
}, scheduleExecutor);
}

@Override
Expand Down
Loading
Loading