Skip to content

Commit

Permalink
[fix](multi-table-load) fix be core when partial table load failed (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui committed May 20, 2024
1 parent 27c75b3 commit 1000e5b
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 6 deletions.
6 changes: 5 additions & 1 deletion be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "util/debug_points.h"
#include "util/thrift_rpc_helper.h"
#include "util/thrift_util.h"
#include "util/time.h"
Expand Down Expand Up @@ -226,8 +227,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
<< ", ctx: " << _ctx->brief();
_unplanned_pipes.clear();

_inflight_cnt += params.size();
for (auto& plan : params) {
DBUG_EXECUTE_IF("MultiTablePipe.exec_plans.failed",
{ return Status::Aborted("MultiTablePipe.exec_plans.failed"); });
if (!plan.__isset.table_name ||
_planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
return Status::Aborted("Missing vital param: table_name");
Expand All @@ -250,6 +252,8 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
CHECK(false);
}

_inflight_cnt++;

RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment(
plan, [this](RuntimeState* state, Status* status) {
DCHECK(state);
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
#include "runtime/workload_group/workload_group_manager.h"
#include "runtime/workload_management/workload_query_info.h"
#include "service/backend_options.h"
#include "util/debug_points.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/hash_util.hpp"
Expand Down Expand Up @@ -855,6 +856,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
}
g_fragmentmgr_prepare_latency << (duration_ns / 1000);

DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
{ return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });

std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
params.local_params[0], params.query_id, params.query_options, &handler,
Expand Down
34 changes: 29 additions & 5 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,20 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
} \
} while (false);

#define HANDLE_MULTI_TABLE_ERROR(stmt, err_msg) \
do { \
Status _status_ = (stmt); \
if (UNLIKELY(!_status_.ok() && !_status_.is<PUBLISH_TIMEOUT>())) { \
err_handler(ctx, _status_, err_msg); \
cb(ctx); \
_status_ = ctx->future.get(); \
if (!_status_.ok()) { \
LOG(ERROR) << "failed to get future, " << ctx->brief(); \
} \
return; \
} \
} while (false);

LOG(INFO) << "begin to execute routine load task: " << ctx->brief();

// create data consumer group
Expand Down Expand Up @@ -378,17 +392,27 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);

// start to consume, this may block a while
HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");

if (ctx->is_multi_table) {
Status st;
// plan the rest of unplanned data
auto multi_table_pipe = std::static_pointer_cast<io::MultiTablePipe>(ctx->body_sink);
HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
"multi tables task executes plan error");
// start to consume, this may block a while
st = consumer_grp->start_all(ctx, kafka_pipe);
if (!st.ok()) {
multi_table_pipe->handle_consume_finished();
HANDLE_MULTI_TABLE_ERROR(st, "consuming failed");
}
st = multi_table_pipe->request_and_exec_plans();
if (!st.ok()) {
multi_table_pipe->handle_consume_finished();
HANDLE_MULTI_TABLE_ERROR(st, "multi tables task executes plan error");
}
// need memory order
multi_table_pipe->handle_consume_finished();
HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
} else {
// start to consume, this may block a while
HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");
}

// wait for all consumers finished
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.ProducerConfig
import org.codehaus.groovy.runtime.IOGroovyMethods

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.ProducerConfig

suite("test_multi_table_load_eror","nonConcurrent") {
def kafkaCsvTpoics = [
"multi_table_csv",
]

String enabled = context.config.otherConfigs.get("enableKafkaTest")
String kafka_port = context.config.otherConfigs.get("kafka_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
def kafka_broker = "${externalEnvIp}:${kafka_port}"

if (enabled != null && enabled.equalsIgnoreCase("true")) {
// define kafka
def props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// Create kafka producer
def producer = new KafkaProducer<>(props)

for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
lines.each { line ->
logger.info("=====${line}========")
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
producer.send(record)
}
}
}

def load_with_injection = { injection ->
def jobName = "test_multi_table_load_eror"
def tableName = "dup_tbl_basic_multi_table"
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
sql "sync"

sql """
CREATE ROUTINE LOAD ${jobName}
COLUMNS TERMINATED BY "|"
PROPERTIES
(
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "multi_table_csv",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"

def count = 0
while (true) {
sleep(1000)
def res = sql "show routine load for ${jobName}"
def state = res[0][8].toString()
if (state != "RUNNING") {
count++
if (count > 60) {
assertEquals(1, 2)
}
continue;
}
log.info("reason of state changed: ${res[0][17].toString()}".toString())
break;
}

count = 0
while (true) {
sleep(1000)
def res = sql "show routine load for ${jobName}"
def state = res[0][8].toString()
if (state == "RUNNING") {
count++
if (count > 60) {
GetDebugPoint().disableDebugPointForAllBEs(injection)
break;
}
continue;
}
log.info("reason of state changed: ${res[0][17].toString()}".toString())
assertEquals(1, 2)
}

count = 0
while (true) {
sleep(1000)
def res = sql "show routine load for ${jobName}"
log.info("routine load statistic: ${res[0][14].toString()}".toString())
def json = parseJson(res[0][14])
if (json.loadedRows.toString() == "0") {
count++
if (count > 60) {
assertEquals(1, 2)
}
continue;
}
break;
}
} finally {
sql "stop routine load for ${jobName}"
sql "DROP TABLE IF EXISTS ${tableName}"
}
}
}

load_with_injection("FragmentMgr.exec_plan_fragment.failed")
load_with_injection("MultiTablePipe.exec_plans.failed")
}

0 comments on commit 1000e5b

Please sign in to comment.