diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 28df937479be30..aeff2f42fa3ca8 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -32,6 +32,7 @@ #include "runtime/fragment_mgr.h" #include "runtime/runtime_state.h" #include "runtime/stream_load/new_load_stream_mgr.h" +#include "util/debug_points.h" #include "util/thrift_rpc_helper.h" #include "util/thrift_util.h" #include "util/time.h" @@ -226,8 +227,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector para << ", ctx: " << _ctx->brief(); _unplanned_pipes.clear(); - _inflight_cnt += params.size(); for (auto& plan : params) { + DBUG_EXECUTE_IF("MultiTablePipe.exec_plans.failed", + { return Status::Aborted("MultiTablePipe.exec_plans.failed"); }); if (!plan.__isset.table_name || _planned_pipes.find(plan.table_name) == _planned_pipes.end()) { return Status::Aborted("Missing vital param: table_name"); @@ -250,6 +252,8 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector para CHECK(false); } + _inflight_cnt++; + RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment( plan, [this](RuntimeState* state, Status* status) { DCHECK(state); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d910dfe97d59a1..c37ef90c618b12 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -78,6 +78,7 @@ #include "runtime/workload_group/workload_group_manager.h" #include "runtime/workload_management/workload_query_info.h" #include "service/backend_options.h" +#include "util/debug_points.h" #include "util/debug_util.h" #include "util/doris_metrics.h" #include "util/hash_util.hpp" @@ -855,6 +856,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, } g_fragmentmgr_prepare_latency << (duration_ns / 1000); + DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed", + { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); }); + std::shared_ptr handler; RETURN_IF_ERROR(_runtimefilter_controller.add_entity( params.local_params[0], params.query_id, params.query_options, &handler, diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 82cea2a2bcabbd..513bce52c8c715 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -324,6 +324,20 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, } \ } while (false); +#define HANDLE_MULTI_TABLE_ERROR(stmt, err_msg) \ + do { \ + Status _status_ = (stmt); \ + if (UNLIKELY(!_status_.ok() && !_status_.is())) { \ + err_handler(ctx, _status_, err_msg); \ + cb(ctx); \ + _status_ = ctx->future.get(); \ + if (!_status_.ok()) { \ + LOG(ERROR) << "failed to get future, " << ctx->brief(); \ + } \ + return; \ + } \ + } while (false); + LOG(INFO) << "begin to execute routine load task: " << ctx->brief(); // create data consumer group @@ -378,17 +392,27 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, std::shared_ptr kafka_pipe = std::static_pointer_cast(ctx->body_sink); - // start to consume, this may block a while - HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed"); - if (ctx->is_multi_table) { + Status st; // plan the rest of unplanned data auto multi_table_pipe = std::static_pointer_cast(ctx->body_sink); - HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(), - "multi tables task executes plan error"); + // start to consume, this may block a while + st = consumer_grp->start_all(ctx, kafka_pipe); + if (!st.ok()) { + multi_table_pipe->handle_consume_finished(); + HANDLE_MULTI_TABLE_ERROR(st, "consuming failed"); + } + st = multi_table_pipe->request_and_exec_plans(); + if (!st.ok()) { + multi_table_pipe->handle_consume_finished(); + HANDLE_MULTI_TABLE_ERROR(st, "multi tables task executes plan error"); + } // need memory order multi_table_pipe->handle_consume_finished(); HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed"); + } else { + // start to consume, this may block a while + HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed"); } // wait for all consumers finished diff --git a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy new file mode 100644 index 00000000000000..7217109c3537e8 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig +import org.codehaus.groovy.runtime.IOGroovyMethods + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_multi_table_load_eror","nonConcurrent") { + def kafkaCsvTpoics = [ + "multi_table_csv", + ] + + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + def load_with_injection = { injection -> + def jobName = "test_multi_table_load_eror" + def tableName = "dup_tbl_basic_multi_table" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text + sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text + sql "sync" + + sql """ + CREATE ROUTINE LOAD ${jobName} + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "multi_table_csv", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state != "RUNNING") { + count++ + if (count > 60) { + assertEquals(1, 2) + } + continue; + } + log.info("reason of state changed: ${res[0][17].toString()}".toString()) + break; + } + + count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state == "RUNNING") { + count++ + if (count > 60) { + GetDebugPoint().disableDebugPointForAllBEs(injection) + break; + } + continue; + } + log.info("reason of state changed: ${res[0][17].toString()}".toString()) + assertEquals(1, 2) + } + + count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + log.info("routine load statistic: ${res[0][14].toString()}".toString()) + def json = parseJson(res[0][14]) + if (json.loadedRows.toString() == "0") { + count++ + if (count > 60) { + assertEquals(1, 2) + } + continue; + } + break; + } + } finally { + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + } + } + + load_with_injection("FragmentMgr.exec_plan_fragment.failed") + load_with_injection("MultiTablePipe.exec_plans.failed") +} \ No newline at end of file