Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "format/transformer/vjni_format_transformer.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/uid_util.h"

Expand Down Expand Up @@ -98,6 +99,7 @@ Status VMCTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {

std::map<std::string, std::string> VMCTableWriter::_build_base_writer_params() {
auto params = _mc_sink.properties;
const auto& master_fe_addr = _state->exec_env()->cluster_info()->master_fe_addr;
if (_mc_sink.__isset.endpoint) params["endpoint"] = _mc_sink.endpoint;
if (_mc_sink.__isset.project) params["project"] = _mc_sink.project;
if (_mc_sink.__isset.table_name) params["table"] = _mc_sink.table_name;
Expand All @@ -117,6 +119,10 @@ std::map<std::string, std::string> VMCTableWriter::_build_base_writer_params() {
if (_mc_sink.__isset.retry_count) {
params["retry_count"] = std::to_string(_mc_sink.retry_count);
}
params["fe_host"] = master_fe_addr.hostname;
params["fe_port"] = std::to_string(master_fe_addr.port);
params["fe_rpc_timeout_ms"] = std::to_string(config::thrift_rpc_timeout_ms);
params["fe_thrift_server_type"] = config::thrift_server_type_of_fe;
return params;
}

Expand Down
4 changes: 0 additions & 4 deletions be/src/util/jni-util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,17 +405,13 @@ Status Util::_init_register_natives() {
static char memory_alloc_batch_sign[] = "([I)[J";
static char memory_free_batch_name[] = "memoryTrackerFreeBatch";
static char memory_free_batch_sign[] = "([J)V";
static char request_mc_block_id_name[] = "requestMaxComputeBlockId";
static char request_mc_block_id_sign[] = "(JLjava/lang/String;)J";
static JNINativeMethod java_native_methods[] = {
{memory_alloc_name, memory_alloc_sign, (void*)&JavaNativeMethods::memoryMalloc},
{memory_free_name, memory_free_sign, (void*)&JavaNativeMethods::memoryFree},
{memory_alloc_batch_name, memory_alloc_batch_sign,
(void*)&JavaNativeMethods::memoryMallocBatch},
{memory_free_batch_name, memory_free_batch_sign,
(void*)&JavaNativeMethods::memoryFreeBatch},
{request_mc_block_id_name, request_mc_block_id_sign,
(void*)&JavaNativeMethods::requestMaxComputeBlockId},
};

int res = env->RegisterNatives(local_jni_native_exc_cl, java_native_methods,
Expand Down
129 changes: 0 additions & 129 deletions be/src/util/jni_native_method.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,119 +17,14 @@

#include "util/jni_native_method.h"

#include <gen_cpp/FrontendService.h>
#include <glog/logging.h>

#include <chrono>
#include <cstdlib>
#include <thread>
#include <vector>

#include "common/status.h"
#include "jni.h"
#include "runtime/exec_env.h"
#include "util/client_cache.h"
#include "util/defer_op.h"
#include "util/thrift_rpc_helper.h"

namespace doris {

namespace {

void throw_java_runtime_exception(JNIEnv* env, const std::string& message) {
jclass exception_cl = env->FindClass("java/lang/IllegalStateException");
if (exception_cl != nullptr) {
env->ThrowNew(exception_cl, message.c_str());
env->DeleteLocalRef(exception_cl);
}
}

Result<int64_t> request_maxcompute_block_id_from_fe(int64_t txn_id,
const std::string& write_session_id) {
if (txn_id <= 0) {
return ResultError(Status::InvalidArgument(
"invalid MaxCompute txn_id for block_id allocation: {}", txn_id));
}
if (write_session_id.empty()) {
return ResultError(Status::InvalidArgument(
"empty MaxCompute write_session_id for block_id allocation"));
}

constexpr uint32_t FETCH_BLOCK_ID_MAX_RETRY_TIMES = 3;
TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
for (uint32_t retry_times = 0; retry_times < FETCH_BLOCK_ID_MAX_RETRY_TIMES; retry_times++) {
TMaxComputeBlockIdRequest request;
TMaxComputeBlockIdResult result;
request.__set_txn_id(txn_id);
request.__set_write_session_id(write_session_id);
request.__set_length(1);

Status rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->getMaxComputeBlockIdRange(result, request);
});

if (!rpc_status.ok()) {
LOG(WARNING) << "Failed to allocate MaxCompute block_id, rpc failure, retry_time="
<< retry_times << ", txn_id=" << txn_id
<< ", write_session_id=" << write_session_id << ", status=" << rpc_status;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

if (!result.__isset.status) {
return ResultError(Status::RpcError(
"failed to allocate MaxCompute block_id from FE, missing status in response, "
"txn_id={}, write_session_id={}",
txn_id, write_session_id));
}

Status fe_status = Status::create<false>(result.status);
if (fe_status.is<ErrorCode::NOT_MASTER>()) {
if (!result.__isset.master_address) {
return ResultError(Status::RpcError(
"failed to allocate MaxCompute block_id from FE, missing master address "
"in NOT_MASTER response, txn_id={}, write_session_id={}",
txn_id, write_session_id));
}
LOG(WARNING) << "Failed to allocate MaxCompute block_id, requested non-master FE@"
<< master_addr.hostname << ":" << master_addr.port << ", switch to FE@"
<< result.master_address.hostname << ":" << result.master_address.port
<< ", retry_time=" << retry_times << ", txn_id=" << txn_id
<< ", write_session_id=" << write_session_id;
master_addr = result.master_address;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

if (!fe_status.ok()) {
LOG(WARNING) << "Failed to allocate MaxCompute block_id, FE returned error, retry_time="
<< retry_times << ", txn_id=" << txn_id
<< ", write_session_id=" << write_session_id << ", status=" << fe_status;
return ResultError(std::move(fe_status));
}

if (result.length != 1) {
return ResultError(Status::RpcError(
"failed to allocate MaxCompute block_id from FE, expected length=1 but got "
"{}, txn_id={}, write_session_id={}",
result.length, txn_id, write_session_id));
}

LOG(INFO) << "Allocated MaxCompute block_id from FE@" << master_addr.hostname << ":"
<< master_addr.port << ", txn_id=" << txn_id
<< ", write_session_id=" << write_session_id << ", block_id=" << result.start;
return result.start;
}

return ResultError(Status::RpcError(
"failed to allocate MaxCompute block_id from FE, txn_id={}, write_session_id={}",
txn_id, write_session_id));
}

} // namespace

jlong JavaNativeMethods::memoryMalloc(JNIEnv* env, jclass clazz, jlong bytes) {
return reinterpret_cast<long>(malloc(bytes));
}
Expand Down Expand Up @@ -209,28 +104,4 @@ void JavaNativeMethods::memoryFreeBatch(JNIEnv* env, jclass clazz, jlongArray ad
env->ReleaseLongArrayElements(addrs, elems, JNI_ABORT);
}

jlong JavaNativeMethods::requestMaxComputeBlockId(JNIEnv* env, jclass clazz, jlong txn_id,
jstring write_session_id) {
if (write_session_id == nullptr) {
throw_java_runtime_exception(
env, "MaxCompute write_session_id is null when requesting block_id");
return 0;
}

const char* write_session_id_chars = env->GetStringUTFChars(write_session_id, nullptr);
if (write_session_id_chars == nullptr) {
throw_java_runtime_exception(env, "Failed to read MaxCompute write_session_id from Java");
return 0;
}
std::string write_session_id_str(write_session_id_chars);
env->ReleaseStringUTFChars(write_session_id, write_session_id_chars);

auto block_id = request_maxcompute_block_id_from_fe(txn_id, write_session_id_str);
if (!block_id.has_value()) {
throw_java_runtime_exception(env, block_id.error().to_string());
return 0;
}
return static_cast<jlong>(block_id.value());
}

} // namespace doris
4 changes: 0 additions & 4 deletions be/src/util/jni_native_method.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ struct JavaNativeMethods {

// Batch free multiple addresses; addrs is a long[]
static void memoryFreeBatch(JNIEnv* env, jclass clazz, jlongArray addrs);

// Request a MaxCompute block id from FE via BE.
static jlong requestMaxComputeBlockId(JNIEnv* env, jclass clazz, jlong txn_id,
jstring write_session_id);
};

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,4 @@ public class JNINativeMethod {
*/
public static native void memoryTrackerFreeBatch(long[] addrs);

/**
* Request a MaxCompute block id from BE, which will forward the request to FE.
*/
public static native long requestMaxComputeBlockId(long txnId, String writeSessionId);
}
6 changes: 6 additions & 0 deletions fe/be-java-extensions/max-compute-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>fe-thrift</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
Expand Down
Loading
Loading