From 734ce0acfb75d69cec723eac1ae7f9823513cd9b Mon Sep 17 00:00:00 2001 From: daidai Date: Mon, 27 Apr 2026 21:25:49 +0800 Subject: [PATCH 1/3] [improvement](maxcompute) Simplify FE block ID requests for MaxCompute writes --- .../writer/maxcompute/vmc_table_writer.cpp | 6 + be/src/util/jni-util.cpp | 4 - be/src/util/jni_native_method.cpp | 129 --------- be/src/util/jni_native_method.h | 4 - .../common/jni/utils/JNINativeMethod.java | 4 - .../max-compute-connector/pom.xml | 6 + .../doris/maxcompute/MaxComputeFeClient.java | 270 ++++++++++++++++++ .../doris/maxcompute/MaxComputeJniWriter.java | 9 +- .../maxcompute/MaxComputeFeClientTest.java | 177 ++++++++++++ .../java/org/apache/doris/common/Config.java | 7 + .../datasource/maxcompute/MCTransaction.java | 7 +- 11 files changed, 475 insertions(+), 148 deletions(-) create mode 100644 fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java create mode 100644 fe/be-java-extensions/max-compute-connector/src/test/java/org/apache/doris/maxcompute/MaxComputeFeClientTest.java diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp index 8bcd27140207c6..a7818ea01d27b2 100644 --- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp +++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp @@ -22,6 +22,7 @@ #include "exprs/vexpr.h" #include "exprs/vexpr_context.h" #include "format/transformer/vjni_format_transformer.h" +#include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "util/uid_util.h" @@ -98,6 +99,7 @@ Status VMCTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { std::map VMCTableWriter::_build_base_writer_params() { auto params = _mc_sink.properties; + const auto& master_fe_addr = _state->exec_env()->cluster_info()->master_fe_addr; if (_mc_sink.__isset.endpoint) params["endpoint"] = _mc_sink.endpoint; if (_mc_sink.__isset.project) params["project"] = _mc_sink.project; if (_mc_sink.__isset.table_name) params["table"] = _mc_sink.table_name; @@ -117,6 +119,10 @@ std::map VMCTableWriter::_build_base_writer_params() { if (_mc_sink.__isset.retry_count) { params["retry_count"] = std::to_string(_mc_sink.retry_count); } + params["fe_host"] = master_fe_addr.hostname; + params["fe_port"] = std::to_string(master_fe_addr.port); + params["fe_rpc_timeout_ms"] = std::to_string(config::thrift_rpc_timeout_ms); + params["fe_thrift_server_type"] = config::thrift_server_type_of_fe; return params; } diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp index 7d4a38690856fa..944b8541146a6e 100644 --- a/be/src/util/jni-util.cpp +++ b/be/src/util/jni-util.cpp @@ -405,8 +405,6 @@ Status Util::_init_register_natives() { static char memory_alloc_batch_sign[] = "([I)[J"; static char memory_free_batch_name[] = "memoryTrackerFreeBatch"; static char memory_free_batch_sign[] = "([J)V"; - static char request_mc_block_id_name[] = "requestMaxComputeBlockId"; - static char request_mc_block_id_sign[] = "(JLjava/lang/String;)J"; static JNINativeMethod java_native_methods[] = { {memory_alloc_name, memory_alloc_sign, (void*)&JavaNativeMethods::memoryMalloc}, {memory_free_name, memory_free_sign, (void*)&JavaNativeMethods::memoryFree}, @@ -414,8 +412,6 @@ Status Util::_init_register_natives() { (void*)&JavaNativeMethods::memoryMallocBatch}, {memory_free_batch_name, memory_free_batch_sign, (void*)&JavaNativeMethods::memoryFreeBatch}, - {request_mc_block_id_name, request_mc_block_id_sign, - (void*)&JavaNativeMethods::requestMaxComputeBlockId}, }; int res = env->RegisterNatives(local_jni_native_exc_cl, java_native_methods, diff --git a/be/src/util/jni_native_method.cpp b/be/src/util/jni_native_method.cpp index 6942095b37622c..549405c766d508 100644 --- a/be/src/util/jni_native_method.cpp +++ b/be/src/util/jni_native_method.cpp @@ -17,119 +17,14 @@ #include "util/jni_native_method.h" -#include -#include - -#include #include -#include #include -#include "common/status.h" #include "jni.h" -#include "runtime/exec_env.h" -#include "util/client_cache.h" #include "util/defer_op.h" -#include "util/thrift_rpc_helper.h" namespace doris { -namespace { - -void throw_java_runtime_exception(JNIEnv* env, const std::string& message) { - jclass exception_cl = env->FindClass("java/lang/IllegalStateException"); - if (exception_cl != nullptr) { - env->ThrowNew(exception_cl, message.c_str()); - env->DeleteLocalRef(exception_cl); - } -} - -Result request_maxcompute_block_id_from_fe(int64_t txn_id, - const std::string& write_session_id) { - if (txn_id <= 0) { - return ResultError(Status::InvalidArgument( - "invalid MaxCompute txn_id for block_id allocation: {}", txn_id)); - } - if (write_session_id.empty()) { - return ResultError(Status::InvalidArgument( - "empty MaxCompute write_session_id for block_id allocation")); - } - - constexpr uint32_t FETCH_BLOCK_ID_MAX_RETRY_TIMES = 3; - TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; - for (uint32_t retry_times = 0; retry_times < FETCH_BLOCK_ID_MAX_RETRY_TIMES; retry_times++) { - TMaxComputeBlockIdRequest request; - TMaxComputeBlockIdResult result; - request.__set_txn_id(txn_id); - request.__set_write_session_id(write_session_id); - request.__set_length(1); - - Status rpc_status = ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &result](FrontendServiceConnection& client) { - client->getMaxComputeBlockIdRange(result, request); - }); - - if (!rpc_status.ok()) { - LOG(WARNING) << "Failed to allocate MaxCompute block_id, rpc failure, retry_time=" - << retry_times << ", txn_id=" << txn_id - << ", write_session_id=" << write_session_id << ", status=" << rpc_status; - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - continue; - } - - if (!result.__isset.status) { - return ResultError(Status::RpcError( - "failed to allocate MaxCompute block_id from FE, missing status in response, " - "txn_id={}, write_session_id={}", - txn_id, write_session_id)); - } - - Status fe_status = Status::create(result.status); - if (fe_status.is()) { - if (!result.__isset.master_address) { - return ResultError(Status::RpcError( - "failed to allocate MaxCompute block_id from FE, missing master address " - "in NOT_MASTER response, txn_id={}, write_session_id={}", - txn_id, write_session_id)); - } - LOG(WARNING) << "Failed to allocate MaxCompute block_id, requested non-master FE@" - << master_addr.hostname << ":" << master_addr.port << ", switch to FE@" - << result.master_address.hostname << ":" << result.master_address.port - << ", retry_time=" << retry_times << ", txn_id=" << txn_id - << ", write_session_id=" << write_session_id; - master_addr = result.master_address; - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - continue; - } - - if (!fe_status.ok()) { - LOG(WARNING) << "Failed to allocate MaxCompute block_id, FE returned error, retry_time=" - << retry_times << ", txn_id=" << txn_id - << ", write_session_id=" << write_session_id << ", status=" << fe_status; - return ResultError(std::move(fe_status)); - } - - if (result.length != 1) { - return ResultError(Status::RpcError( - "failed to allocate MaxCompute block_id from FE, expected length=1 but got " - "{}, txn_id={}, write_session_id={}", - result.length, txn_id, write_session_id)); - } - - LOG(INFO) << "Allocated MaxCompute block_id from FE@" << master_addr.hostname << ":" - << master_addr.port << ", txn_id=" << txn_id - << ", write_session_id=" << write_session_id << ", block_id=" << result.start; - return result.start; - } - - return ResultError(Status::RpcError( - "failed to allocate MaxCompute block_id from FE, txn_id={}, write_session_id={}", - txn_id, write_session_id)); -} - -} // namespace - jlong JavaNativeMethods::memoryMalloc(JNIEnv* env, jclass clazz, jlong bytes) { return reinterpret_cast(malloc(bytes)); } @@ -209,28 +104,4 @@ void JavaNativeMethods::memoryFreeBatch(JNIEnv* env, jclass clazz, jlongArray ad env->ReleaseLongArrayElements(addrs, elems, JNI_ABORT); } -jlong JavaNativeMethods::requestMaxComputeBlockId(JNIEnv* env, jclass clazz, jlong txn_id, - jstring write_session_id) { - if (write_session_id == nullptr) { - throw_java_runtime_exception( - env, "MaxCompute write_session_id is null when requesting block_id"); - return 0; - } - - const char* write_session_id_chars = env->GetStringUTFChars(write_session_id, nullptr); - if (write_session_id_chars == nullptr) { - throw_java_runtime_exception(env, "Failed to read MaxCompute write_session_id from Java"); - return 0; - } - std::string write_session_id_str(write_session_id_chars); - env->ReleaseStringUTFChars(write_session_id, write_session_id_chars); - - auto block_id = request_maxcompute_block_id_from_fe(txn_id, write_session_id_str); - if (!block_id.has_value()) { - throw_java_runtime_exception(env, block_id.error().to_string()); - return 0; - } - return static_cast(block_id.value()); -} - } // namespace doris diff --git a/be/src/util/jni_native_method.h b/be/src/util/jni_native_method.h index 23429c3500a8be..48c74d91d67836 100644 --- a/be/src/util/jni_native_method.h +++ b/be/src/util/jni_native_method.h @@ -42,10 +42,6 @@ struct JavaNativeMethods { // Batch free multiple addresses; addrs is a long[] static void memoryFreeBatch(JNIEnv* env, jclass clazz, jlongArray addrs); - - // Request a MaxCompute block id from FE via BE. - static jlong requestMaxComputeBlockId(JNIEnv* env, jclass clazz, jlong txn_id, - jstring write_session_id); }; } // namespace doris diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java index 1104a5fa934c1f..d48fe8e9347029 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java @@ -43,8 +43,4 @@ public class JNINativeMethod { */ public static native void memoryTrackerFreeBatch(long[] addrs); - /** - * Request a MaxCompute block id from BE, which will forward the request to FE. - */ - public static native long requestMaxComputeBlockId(long txnId, String writeSessionId); } diff --git a/fe/be-java-extensions/max-compute-connector/pom.xml b/fe/be-java-extensions/max-compute-connector/pom.xml index 8f84c6b31aab45..74c528fa2f12d2 100644 --- a/fe/be-java-extensions/max-compute-connector/pom.xml +++ b/fe/be-java-extensions/max-compute-connector/pom.xml @@ -41,6 +41,12 @@ under the License. ${project.version} provided + + org.apache.doris + fe-thrift + ${project.version} + provided + com.aliyun.odps odps-sdk-core diff --git a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java new file mode 100644 index 00000000000000..757d5fe60dc025 --- /dev/null +++ b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.maxcompute; + +import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.TMaxComputeBlockIdRequest; +import org.apache.doris.thrift.TMaxComputeBlockIdResult; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStatusCode; + +import org.apache.log4j.Logger; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.layered.TFramedTransport; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * FE thrift client used by MaxCompute writer runtime code in BE's embedded JVM. + */ +class MaxComputeFeClient { + static final String FE_HOST = "fe_host"; + static final String FE_PORT = "fe_port"; + static final String FE_RPC_TIMEOUT_MS = "fe_rpc_timeout_ms"; + static final String FE_THRIFT_SERVER_TYPE = "fe_thrift_server_type"; + + private static final Logger LOG = Logger.getLogger(MaxComputeFeClient.class); + private static final int FETCH_BLOCK_ID_MAX_RETRY_TIMES = 3; + private static final long FETCH_BLOCK_ID_RETRY_SLEEP_MS = 10L; + private static final long FETCH_BLOCK_ID_LENGTH = 1L; + private static final int DEFAULT_FE_RPC_TIMEOUT_MS = 60000; + private static final String THREADED_SELECTOR = "THREADED_SELECTOR"; + private static final String THREAD_POOL = "THREAD_POOL"; + + private final int rpcTimeoutMs; + private final String thriftServerType; + private final RpcExecutor rpcExecutor; + private final long retrySleepMs; + private TNetworkAddress masterAddress; + + static MaxComputeFeClient create(Map params) { + String host = requireParam(params, FE_HOST); + int port = Integer.parseInt(requireParam(params, FE_PORT)); + int timeoutMs = Integer.parseInt(params.getOrDefault(FE_RPC_TIMEOUT_MS, + String.valueOf(DEFAULT_FE_RPC_TIMEOUT_MS))); + String serverType = params.getOrDefault(FE_THRIFT_SERVER_TYPE, THREAD_POOL); + return new MaxComputeFeClient(new TNetworkAddress(host, port), timeoutMs, serverType); + } + + MaxComputeFeClient(TNetworkAddress masterAddress, int rpcTimeoutMs, String thriftServerType) { + this(masterAddress, rpcTimeoutMs, thriftServerType, new RpcExecutor() { + @Override + public T call(TNetworkAddress address, int timeoutMs, boolean useFramedTransport, + FeCall call) throws Exception { + return callFe(address, timeoutMs, useFramedTransport, call); + } + }, FETCH_BLOCK_ID_RETRY_SLEEP_MS); + } + + MaxComputeFeClient(TNetworkAddress masterAddress, int rpcTimeoutMs, String thriftServerType, + RpcExecutor rpcExecutor, long retrySleepMs) { + this.masterAddress = copyAddress(Objects.requireNonNull(masterAddress, "masterAddress")); + this.rpcTimeoutMs = rpcTimeoutMs; + this.thriftServerType = thriftServerType == null ? THREAD_POOL : thriftServerType; + this.rpcExecutor = Objects.requireNonNull(rpcExecutor, "rpcExecutor"); + this.retrySleepMs = retrySleepMs; + } + + long requestBlockId(long txnId, String writeSessionId) throws IOException { + if (txnId <= 0) { + throw new IOException("invalid MaxCompute txn_id for block_id allocation: " + txnId); + } + if (writeSessionId == null || writeSessionId.isEmpty()) { + throw new IOException("empty MaxCompute write_session_id for block_id allocation"); + } + + TMaxComputeBlockIdRequest request = buildBlockIdRequest(txnId, writeSessionId); + return callWithMasterRedirect( + "allocate MaxCompute block_id", + client -> client.getMaxComputeBlockIdRange(request), + (result, requestAddress, retryTimes) -> + handleBlockIdResult(result, requestAddress, retryTimes, txnId, writeSessionId)); + } + + private R callWithMasterRedirect(String operation, FeCall call, ResponseHandler handler) + throws IOException { + validateAddress(masterAddress); + + Exception lastException = null; + for (int retryTimes = 0; retryTimes < FETCH_BLOCK_ID_MAX_RETRY_TIMES; retryTimes++) { + TNetworkAddress requestAddress = copyAddress(masterAddress); + T result; + try { + result = rpcExecutor.call(requestAddress, rpcTimeoutMs, useFramedTransport(), call); + } catch (Exception e) { + lastException = e; + LOG.warn("Failed to " + operation + ", rpc failure, retry_time=" + + retryTimes + ", fe=" + formatAddress(requestAddress), e); + sleepBeforeRetry(); + continue; + } + + try { + return handler.handle(result, requestAddress, retryTimes); + } catch (NotMasterException e) { + masterAddress = copyAddress(e.masterAddress); + lastException = e; + sleepBeforeRetry(); + } + } + + throw new IOException("failed to " + operation + " from FE", lastException); + } + + private long handleBlockIdResult(TMaxComputeBlockIdResult result, TNetworkAddress requestAddress, int retryTimes, + long txnId, String writeSessionId) throws IOException, NotMasterException { + if (result == null || !result.isSetStatus()) { + throw new IOException("failed to allocate MaxCompute block_id from FE, missing status in response, " + + "txn_id=" + txnId + ", write_session_id=" + writeSessionId); + } + + TStatus status = result.getStatus(); + TStatusCode code = status.getStatusCode(); + if (code == null) { + throw new IOException("failed to allocate MaxCompute block_id from FE, missing status code, " + + "txn_id=" + txnId + ", write_session_id=" + writeSessionId); + } + if (code == TStatusCode.NOT_MASTER) { + if (!result.isSetMasterAddress()) { + throw new IOException("failed to allocate MaxCompute block_id from FE, missing master address " + + "in NOT_MASTER response, txn_id=" + txnId + ", write_session_id=" + writeSessionId); + } + LOG.warn("Failed to allocate MaxCompute block_id, requested non-master FE@" + + formatAddress(requestAddress) + ", switch to FE@" + formatAddress(result.getMasterAddress()) + + ", retry_time=" + retryTimes + ", txn_id=" + txnId + + ", write_session_id=" + writeSessionId); + throw new NotMasterException(result.getMasterAddress()); + } + + if (code != TStatusCode.OK) { + throw new IOException("failed to allocate MaxCompute block_id from FE, status=" + + statusErrorMessage(status) + ", txn_id=" + txnId + + ", write_session_id=" + writeSessionId); + } + + if (!result.isSetStart()) { + throw new IOException("failed to allocate MaxCompute block_id from FE, missing start in response, " + + "txn_id=" + txnId + ", write_session_id=" + writeSessionId); + } + if (!result.isSetLength() || result.getLength() != FETCH_BLOCK_ID_LENGTH) { + throw new IOException("failed to allocate MaxCompute block_id from FE, expected length=1 but got " + + result.getLength() + ", txn_id=" + txnId + ", write_session_id=" + writeSessionId); + } + + LOG.info("Allocated MaxCompute block_id from FE@" + formatAddress(requestAddress) + + ", txn_id=" + txnId + ", write_session_id=" + writeSessionId + + ", block_id=" + result.getStart()); + return result.getStart(); + } + + private static TMaxComputeBlockIdRequest buildBlockIdRequest(long txnId, String writeSessionId) { + TMaxComputeBlockIdRequest request = new TMaxComputeBlockIdRequest(); + request.setTxnId(txnId); + request.setWriteSessionId(writeSessionId); + request.setLength(FETCH_BLOCK_ID_LENGTH); + return request; + } + + private boolean useFramedTransport() { + return THREADED_SELECTOR.equalsIgnoreCase(thriftServerType); + } + + private void sleepBeforeRetry() throws IOException { + try { + Thread.sleep(retrySleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("interrupted while retrying MaxCompute block_id allocation", e); + } + } + + private static T callFe(TNetworkAddress address, int timeoutMs, + boolean useFramedTransport, FeCall call) throws Exception { + TSocket socket = new TSocket(address.getHostname(), address.getPort(), timeoutMs); + TTransport transport = useFramedTransport ? new TFramedTransport(socket) : socket; + try { + transport.open(); + FrontendService.Client client = new FrontendService.Client(new TBinaryProtocol(transport)); + return call.call(client); + } finally { + transport.close(); + } + } + + private static void validateAddress(TNetworkAddress address) throws IOException { + if (address.getHostname() == null || address.getHostname().isEmpty() || address.getPort() <= 0) { + throw new IOException("invalid FE address for MaxCompute block_id allocation: " + + formatAddress(address)); + } + } + + private static String statusErrorMessage(TStatus status) { + List errorMsgs = status.getErrorMsgs(); + if (errorMsgs == null || errorMsgs.isEmpty()) { + return status.getStatusCode().name(); + } + return status.getStatusCode().name() + ": " + String.join("; ", errorMsgs); + } + + private static String requireParam(Map params, String key) { + String value = params.get(key); + if (value == null || value.isEmpty()) { + throw new IllegalArgumentException("required property '" + key + "'."); + } + return value; + } + + private static TNetworkAddress copyAddress(TNetworkAddress address) { + return new TNetworkAddress(address.getHostname(), address.getPort()); + } + + private static String formatAddress(TNetworkAddress address) { + if (address == null) { + return "null"; + } + return address.getHostname() + ":" + address.getPort(); + } + + interface RpcExecutor { + T call(TNetworkAddress address, int timeoutMs, boolean useFramedTransport, + FeCall call) throws Exception; + } + + interface FeCall { + T call(FrontendService.Client client) throws Exception; + } + + private interface ResponseHandler { + R handle(T result, TNetworkAddress requestAddress, int retryTimes) throws IOException, NotMasterException; + } + + private static class NotMasterException extends Exception { + private final TNetworkAddress masterAddress; + + NotMasterException(TNetworkAddress masterAddress) { + super("not master, master=" + formatAddress(masterAddress)); + this.masterAddress = copyAddress(masterAddress); + } + } +} diff --git a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java index 38bfc7f17a70b4..c4d6fab8814ac0 100644 --- a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java +++ b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java @@ -18,7 +18,6 @@ package org.apache.doris.maxcompute; import org.apache.doris.common.jni.JniWriter; -import org.apache.doris.common.jni.utils.JNINativeMethod; import org.apache.doris.common.jni.vec.VectorColumn; import org.apache.doris.common.jni.vec.VectorTable; import org.apache.doris.common.maxcompute.MCProperties; @@ -115,6 +114,7 @@ public class MaxComputeJniWriter extends JniWriter { private final int readTimeout; private final int retryCount; private final long maxBlockBytes; + private final MaxComputeFeClient feClient; // Storage API objects private TableBatchWriteSession writeSession; @@ -155,6 +155,7 @@ public MaxComputeJniWriter(int batchSize, Map params) { this.maxBlockBytes = Long.parseLong( params.getOrDefault(MCProperties.WRITE_MAX_BLOCK_BYTES, MCProperties.DEFAULT_WRITE_MAX_BLOCK_BYTES)); + this.feClient = MaxComputeFeClient.create(params); } @Override @@ -241,12 +242,12 @@ protected void writeInternal(VectorTable inputTable) throws IOException { } } - private long resolveInitialBlockId() { + private long resolveInitialBlockId() throws IOException { return preallocatedBlockId != null ? preallocatedBlockId : requestBlockId(); } - private long requestBlockId() { - return JNINativeMethod.requestMaxComputeBlockId(txnId, writeSessionId); + private long requestBlockId() throws IOException { + return feClient.requestBlockId(txnId, writeSessionId); } private void openBatchWriter(long blockId) throws IOException { diff --git a/fe/be-java-extensions/max-compute-connector/src/test/java/org/apache/doris/maxcompute/MaxComputeFeClientTest.java b/fe/be-java-extensions/max-compute-connector/src/test/java/org/apache/doris/maxcompute/MaxComputeFeClientTest.java new file mode 100644 index 00000000000000..923919de408232 --- /dev/null +++ b/fe/be-java-extensions/max-compute-connector/src/test/java/org/apache/doris/maxcompute/MaxComputeFeClientTest.java @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.maxcompute; + +import org.apache.doris.thrift.TMaxComputeBlockIdRequest; +import org.apache.doris.thrift.TMaxComputeBlockIdResult; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStatusCode; + +import org.apache.thrift.protocol.TProtocol; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Queue; + +public class MaxComputeFeClientTest { + @Test + public void testRequestBlockIdSuccess() throws Exception { + FakeExecutor executor = new FakeExecutor(okResult(42L)); + MaxComputeFeClient client = new MaxComputeFeClient( + new TNetworkAddress("fe1", 9010), 1234, "THREAD_POOL", executor, 0); + + Assert.assertEquals(42L, client.requestBlockId(100L, "session-1")); + Assert.assertEquals(1, executor.addresses.size()); + Assert.assertEquals("fe1", executor.addresses.get(0).getHostname()); + Assert.assertEquals(9010, executor.addresses.get(0).getPort()); + Assert.assertFalse(executor.framedTransports.get(0)); + Assert.assertEquals(1234, (int) executor.timeouts.get(0)); + Assert.assertEquals(100L, executor.requests.get(0).getTxnId()); + Assert.assertEquals("session-1", executor.requests.get(0).getWriteSessionId()); + Assert.assertEquals(1L, executor.requests.get(0).getLength()); + } + + @Test + public void testRequestBlockIdRedirectsToMaster() throws Exception { + FakeExecutor executor = new FakeExecutor(notMasterResult("master", 9020), okResult(7L)); + MaxComputeFeClient client = new MaxComputeFeClient( + new TNetworkAddress("follower", 9010), 1234, "THREADED_SELECTOR", executor, 0); + + Assert.assertEquals(7L, client.requestBlockId(101L, "session-2")); + Assert.assertEquals(2, executor.addresses.size()); + Assert.assertEquals("follower", executor.addresses.get(0).getHostname()); + Assert.assertEquals("master", executor.addresses.get(1).getHostname()); + Assert.assertTrue(executor.framedTransports.get(0)); + Assert.assertTrue(executor.framedTransports.get(1)); + } + + @Test + public void testFeErrorFailsWithoutRetry() { + FakeExecutor executor = new FakeExecutor(errorResult("allocation failed")); + MaxComputeFeClient client = new MaxComputeFeClient( + new TNetworkAddress("fe1", 9010), 1234, "THREAD_POOL", executor, 0); + + expectIOExceptionContains(() -> client.requestBlockId(102L, "session-3"), "allocation failed"); + Assert.assertEquals(1, executor.addresses.size()); + } + + @Test + public void testRpcFailureRetries() throws Exception { + FakeExecutor executor = new FakeExecutor( + new IOException("connect failed"), + new IOException("temporary failure"), + okResult(9L)); + MaxComputeFeClient client = new MaxComputeFeClient( + new TNetworkAddress("fe1", 9010), 1234, "THREAD_POOL", executor, 0); + + Assert.assertEquals(9L, client.requestBlockId(103L, "session-4")); + Assert.assertEquals(3, executor.addresses.size()); + } + + private static TMaxComputeBlockIdResult okResult(long start) { + TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult(); + result.setStatus(new TStatus(TStatusCode.OK)); + result.setStart(start); + result.setLength(1L); + return result; + } + + private static TMaxComputeBlockIdResult notMasterResult(String host, int port) { + TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult(); + result.setStatus(new TStatus(TStatusCode.NOT_MASTER)); + result.setMasterAddress(new TNetworkAddress(host, port)); + return result; + } + + private static TMaxComputeBlockIdResult errorResult(String errorMsg) { + TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(errorMsg); + TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult(); + result.setStatus(status); + return result; + } + + private static void expectIOExceptionContains(IOAction action, String expectedMessage) { + try { + action.run(); + Assert.fail("expected IOException"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage(), e.getMessage().contains(expectedMessage)); + } + } + + private interface IOAction { + void run() throws IOException; + } + + private static class FakeExecutor implements MaxComputeFeClient.RpcExecutor { + private final Queue responses; + private final List addresses = new ArrayList<>(); + private final List timeouts = new ArrayList<>(); + private final List framedTransports = new ArrayList<>(); + private final List requests = new ArrayList<>(); + + FakeExecutor(Object... responses) { + this.responses = new ArrayDeque<>(Arrays.asList(responses)); + } + + @Override + public T call(TNetworkAddress address, int timeoutMs, boolean useFramedTransport, + MaxComputeFeClient.FeCall call) throws Exception { + addresses.add(new TNetworkAddress(address.getHostname(), address.getPort())); + timeouts.add(timeoutMs); + framedTransports.add(useFramedTransport); + + FrontendServiceClient client = new FrontendServiceClient(); + return call.call(client); + } + + private class FrontendServiceClient extends org.apache.doris.thrift.FrontendService.Client { + FrontendServiceClient() { + super((TProtocol) null); + } + + @Override + public TMaxComputeBlockIdResult getMaxComputeBlockIdRange(TMaxComputeBlockIdRequest request) + throws org.apache.thrift.TException { + requests.add(request); + + Object response = responses.remove(); + if (response instanceof RuntimeException) { + throw (RuntimeException) response; + } + if (response instanceof IOException) { + throw new RuntimeException((IOException) response); + } + if (response instanceof org.apache.thrift.TException) { + throw (org.apache.thrift.TException) response; + } + if (response instanceof Exception) { + throw new RuntimeException((Exception) response); + } + return (TMaxComputeBlockIdResult) response; + } + } + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index f0581b78574d70..e42890f019755f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2148,6 +2148,13 @@ public class Config extends ConfigBase { @ConfField(description = {"Maximum cached file number for external table split file meta cache at query level."}) public static long max_external_table_split_file_meta_cache_num = 100000; + /** + * Maximum number of MaxCompute Storage API write block IDs that can be allocated in one write session. + */ + @ConfField(mutable = true, masterOnly = true, description = { + "Maximum number of MaxCompute Storage API write block IDs that can be allocated in one write session."}) + public static long max_compute_write_max_block_count = 20000L; + /** * Max cache loader thread-pool size. * Max thread pool size for loading external meta cache diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java index 76a3c84ebb7602..9f1c61ddf248fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.maxcompute; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; @@ -49,7 +50,6 @@ public class MCTransaction implements Transaction { private static final Logger LOG = LogManager.getLogger(MCTransaction.class); - private static final long MAX_BLOCK_COUNT = 20000L; private final MaxComputeExternalCatalog catalog; private MaxComputeExternalTable table; @@ -147,9 +147,10 @@ public long allocateBlockIdRange(String requestWriteSessionId, long length) thro do { start = nextBlockId.get(); endExclusive = start + length; - if (endExclusive > MAX_BLOCK_COUNT) { + if (endExclusive > Config.max_compute_write_max_block_count) { throw new UserException("MaxCompute block_id exceeds limit, start=" - + start + ", length=" + length + ", maxBlockCount=" + MAX_BLOCK_COUNT); + + start + ", length=" + length + ", maxBlockCount=" + + Config.max_compute_write_max_block_count); } } while (!nextBlockId.compareAndSet(start, endExclusive)); From e508d3027365bf439a48e6fd0c6d5373cb11fe06 Mon Sep 17 00:00:00 2001 From: daidai Date: Mon, 27 Apr 2026 22:37:19 +0800 Subject: [PATCH 2/3] fix comment --- .../doris/maxcompute/MaxComputeFeClient.java | 95 +++++++++++++++---- .../doris/maxcompute/MaxComputeJniWriter.java | 2 + .../java/org/apache/doris/common/Config.java | 2 +- 3 files changed, 79 insertions(+), 20 deletions(-) diff --git a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java index 757d5fe60dc025..b7f9f1c7489ab4 100644 --- a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java +++ b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java @@ -28,6 +28,7 @@ import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.layered.TFramedTransport; import java.io.IOException; @@ -38,7 +39,7 @@ /** * FE thrift client used by MaxCompute writer runtime code in BE's embedded JVM. */ -class MaxComputeFeClient { +class MaxComputeFeClient implements AutoCloseable { static final String FE_HOST = "fe_host"; static final String FE_PORT = "fe_port"; static final String FE_RPC_TIMEOUT_MS = "fe_rpc_timeout_ms"; @@ -68,13 +69,8 @@ static MaxComputeFeClient create(Map params) { } MaxComputeFeClient(TNetworkAddress masterAddress, int rpcTimeoutMs, String thriftServerType) { - this(masterAddress, rpcTimeoutMs, thriftServerType, new RpcExecutor() { - @Override - public T call(TNetworkAddress address, int timeoutMs, boolean useFramedTransport, - FeCall call) throws Exception { - return callFe(address, timeoutMs, useFramedTransport, call); - } - }, FETCH_BLOCK_ID_RETRY_SLEEP_MS); + this(masterAddress, rpcTimeoutMs, thriftServerType, new ReusableRpcExecutor(), + FETCH_BLOCK_ID_RETRY_SLEEP_MS); } MaxComputeFeClient(TNetworkAddress masterAddress, int rpcTimeoutMs, String thriftServerType, @@ -102,7 +98,13 @@ long requestBlockId(long txnId, String writeSessionId) throws IOException { handleBlockIdResult(result, requestAddress, retryTimes, txnId, writeSessionId)); } - private R callWithMasterRedirect(String operation, FeCall call, ResponseHandler handler) + @Override + public synchronized void close() { + rpcExecutor.close(); + } + + private synchronized R callWithMasterRedirect(String operation, FeCall call, + ResponseHandler handler) throws IOException { validateAddress(masterAddress); @@ -114,6 +116,7 @@ private R callWithMasterRedirect(String operation, FeCall call, Respon result = rpcExecutor.call(requestAddress, rpcTimeoutMs, useFramedTransport(), call); } catch (Exception e) { lastException = e; + rpcExecutor.close(); LOG.warn("Failed to " + operation + ", rpc failure, retry_time=" + retryTimes + ", fe=" + formatAddress(requestAddress), e); sleepBeforeRetry(); @@ -125,6 +128,7 @@ private R callWithMasterRedirect(String operation, FeCall call, Respon } catch (NotMasterException e) { masterAddress = copyAddress(e.masterAddress); lastException = e; + rpcExecutor.close(); sleepBeforeRetry(); } } @@ -199,17 +203,10 @@ private void sleepBeforeRetry() throws IOException { } } - private static T callFe(TNetworkAddress address, int timeoutMs, - boolean useFramedTransport, FeCall call) throws Exception { + private static TTransport createTransport(TNetworkAddress address, int timeoutMs, + boolean useFramedTransport) throws TTransportException { TSocket socket = new TSocket(address.getHostname(), address.getPort(), timeoutMs); - TTransport transport = useFramedTransport ? new TFramedTransport(socket) : socket; - try { - transport.open(); - FrontendService.Client client = new FrontendService.Client(new TBinaryProtocol(transport)); - return call.call(client); - } finally { - transport.close(); - } + return useFramedTransport ? new TFramedTransport(socket) : socket; } private static void validateAddress(TNetworkAddress address) throws IOException { @@ -239,6 +236,12 @@ private static TNetworkAddress copyAddress(TNetworkAddress address) { return new TNetworkAddress(address.getHostname(), address.getPort()); } + private static boolean sameAddress(TNetworkAddress left, TNetworkAddress right) { + return left != null && right != null + && Objects.equals(left.getHostname(), right.getHostname()) + && left.getPort() == right.getPort(); + } + private static String formatAddress(TNetworkAddress address) { if (address == null) { return "null"; @@ -249,6 +252,9 @@ private static String formatAddress(TNetworkAddress address) { interface RpcExecutor { T call(TNetworkAddress address, int timeoutMs, boolean useFramedTransport, FeCall call) throws Exception; + + default void close() { + } } interface FeCall { @@ -259,6 +265,57 @@ private interface ResponseHandler { R handle(T result, TNetworkAddress requestAddress, int retryTimes) throws IOException, NotMasterException; } + private static class ReusableRpcExecutor implements RpcExecutor { + private TNetworkAddress connectedAddress; + private boolean connectedFramedTransport; + private TTransport transport; + private FrontendService.Client client; + + @Override + public synchronized T call(TNetworkAddress address, int timeoutMs, boolean useFramedTransport, + FeCall call) throws Exception { + ensureConnected(address, timeoutMs, useFramedTransport); + try { + return call.call(client); + } catch (Exception e) { + close(); + throw e; + } + } + + @Override + public synchronized void close() { + if (transport != null) { + transport.close(); + } + transport = null; + client = null; + connectedAddress = null; + } + + private void ensureConnected(TNetworkAddress address, int timeoutMs, boolean useFramedTransport) + throws Exception { + if (client != null && transport != null && transport.isOpen() + && connectedFramedTransport == useFramedTransport + && sameAddress(connectedAddress, address)) { + return; + } + + close(); + TTransport newTransport = createTransport(address, timeoutMs, useFramedTransport); + try { + newTransport.open(); + transport = newTransport; + client = new FrontendService.Client(new TBinaryProtocol(transport)); + connectedAddress = copyAddress(address); + connectedFramedTransport = useFramedTransport; + } catch (Exception e) { + newTransport.close(); + throw e; + } + } + } + private static class NotMasterException extends Exception { private final TNetworkAddress masterAddress; diff --git a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java index c4d6fab8814ac0..9788184057ee74 100644 --- a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java +++ b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java @@ -919,6 +919,8 @@ public void close() throws IOException { String errorMsg = "Failed to close MaxCompute arrow writer"; LOG.error(errorMsg, e); throw new IOException(errorMsg, e); + } finally { + feClient.close(); } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index e42890f019755f..c9506eea58bd6a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2151,7 +2151,7 @@ public class Config extends ConfigBase { /** * Maximum number of MaxCompute Storage API write block IDs that can be allocated in one write session. */ - @ConfField(mutable = true, masterOnly = true, description = { + @ConfField(mutable = false, masterOnly = true, description = { "Maximum number of MaxCompute Storage API write block IDs that can be allocated in one write session."}) public static long max_compute_write_max_block_count = 20000L; From 8397d0be8d7066d1631dadbad0eb35aba1f7b4a5 Mon Sep 17 00:00:00 2001 From: daidai Date: Wed, 29 Apr 2026 11:26:29 +0800 Subject: [PATCH 3/3] fix commet 2 --- .../doris/maxcompute/MaxComputeFeClient.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java index b7f9f1c7489ab4..82b58f48493dda 100644 --- a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java +++ b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java @@ -65,11 +65,8 @@ static MaxComputeFeClient create(Map params) { int timeoutMs = Integer.parseInt(params.getOrDefault(FE_RPC_TIMEOUT_MS, String.valueOf(DEFAULT_FE_RPC_TIMEOUT_MS))); String serverType = params.getOrDefault(FE_THRIFT_SERVER_TYPE, THREAD_POOL); - return new MaxComputeFeClient(new TNetworkAddress(host, port), timeoutMs, serverType); - } - - MaxComputeFeClient(TNetworkAddress masterAddress, int rpcTimeoutMs, String thriftServerType) { - this(masterAddress, rpcTimeoutMs, thriftServerType, new ReusableRpcExecutor(), + return new MaxComputeFeClient(new TNetworkAddress(host, port), timeoutMs, serverType, + new ReusableRpcExecutor(), FETCH_BLOCK_ID_RETRY_SLEEP_MS); } @@ -176,9 +173,11 @@ private long handleBlockIdResult(TMaxComputeBlockIdResult result, TNetworkAddres + result.getLength() + ", txn_id=" + txnId + ", write_session_id=" + writeSessionId); } - LOG.info("Allocated MaxCompute block_id from FE@" + formatAddress(requestAddress) - + ", txn_id=" + txnId + ", write_session_id=" + writeSessionId - + ", block_id=" + result.getStart()); + if (LOG.isDebugEnabled()) { + LOG.debug("Allocated MaxCompute block_id from FE@" + formatAddress(requestAddress) + + ", txn_id=" + txnId + ", write_session_id=" + writeSessionId + + ", block_id=" + result.getStart()); + } return result.getStart(); }