Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
178b3db
[Enhancement] BE webserver support SSL #22971
LuGuangming Aug 14, 2023
098f1ba
[feature](load) refactor CSV reading process during scanning, and sup…
Hastyshell Aug 15, 2023
3175425
[Chore](docs)Add SSL Faq (#22956)
irenesrl Aug 15, 2023
68caed6
[bug](if) fix if function not handle const nullable value (#22823)
zhangstar333 Aug 15, 2023
28a7a6c
[fix](datastream sender) fix wrong result of BUCKET_SHFFULE_HASH_PART…
jacktengg Aug 15, 2023
c143683
[fix](createTableStmt)fix bug that createTableStmt toSql (#22750)
hubgeter Aug 15, 2023
321f6bb
[Chore](excution) change some log fatal and dcheck to exception (#22890)
BiteTheDDDDt Aug 15, 2023
0dee43c
[Bug](aggregation)fix for map_agg when columns[1] is nullable (#22932)
xingyingone Aug 15, 2023
413d1c5
[Fix](inverted index) fix non-index match function core (#22959)
airborne12 Aug 15, 2023
f138695
[refactor](load) change segcompaction worker interface (#22928)
kaijchen Aug 15, 2023
cdffb5e
[fix](Nereids) type check could not work when root node is table or f…
morrySnow Aug 15, 2023
8f0e297
[Bug](exchange) init _instance_to_rpc_ctx on register_sink (#22976)
BiteTheDDDDt Aug 15, 2023
9548e86
[FIX](map)insert into doris table with array/map type by local tvf (#…
Aug 15, 2023
06dec46
[chore](Nereids): remove useless code (#22960)
jackwener Aug 15, 2023
804c4a6
[fix](analysis) fix error msg #22950
LemonLiTree Aug 15, 2023
d35cc3f
[performance](executor) optimize time_round function only one arg (#2…
Mryange Aug 15, 2023
ca4a3ae
[refactor](parquet)change decimal type export as fixed-len-byte on pa…
zhangstar333 Aug 15, 2023
612a67a
[fix](load) expose error root cause msg for load (#22968)
freemandealer Aug 15, 2023
c0935fb
[feature](agg) Make 'map_agg' support array type as value (#22945)
mrhhsg Aug 15, 2023
41d66f4
[fix](function) fix str_to_date with specific format #22981
Mryange Aug 15, 2023
f895b36
[feature](auth)support Col auth (#22629)
zddr Aug 15, 2023
2bd46c5
[Enhancement](multi-catalog) add a BE selection strategy for hdfs sho…
dutyu Aug 15, 2023
bdbf5fe
[Enhancement] BE webserver support SSL #22971
LuGuangming Aug 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ endif()

add_thirdparty(libevent LIBNAME "lib/libevent.a")
add_thirdparty(libevent_pthreads LIBNAME "lib/libevent_pthreads.a")
add_thirdparty(libevent_openssl LIBNAME "lib/libevent_openssl.a")
add_thirdparty(libbz2 LIBNAME "lib/libbz2.a")
add_thirdparty(libz LIBNAME "lib/libz.a")
add_thirdparty(crypto)
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/line_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace doris {
namespace io {
class IOContext;
}
// This class is used for CSV scanner, to read content line by line
// This class is used to read content line by line
class LineReader {
public:
virtual ~LineReader() = default;
Expand Down
5 changes: 4 additions & 1 deletion be/src/exec/text_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ TextConverter::TextConverter(char escape_char, char collection_delimiter, char m

void TextConverter::write_string_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr* column_ptr, const char* data,
size_t len) {
size_t len, bool need_escape) {
DCHECK(column_ptr->get()->is_nullable());
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
if (need_escape) {
unescape_string_on_spot(data, &len);
}
if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) {
nullable_column->get_null_map_data().push_back(1);
reinterpret_cast<vectorized::ColumnString&>(nullable_column->get_nested_column())
Expand Down
14 changes: 11 additions & 3 deletions be/src/exec/text_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#pragma once

#include <stddef.h>
#include <cstddef>

#include "vec/columns/column.h"

Expand All @@ -33,9 +33,15 @@ class TextConverter {

TextConverter(char escape_char, char collection_delimiter = '\2', char map_kv_delimiter = '\3');

inline void write_string_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr* column_ptr, const char* data,
size_t len) {
return write_string_column(slot_desc, column_ptr, data, len, false);
}

void write_string_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr* column_ptr, const char* data,
size_t len);
vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len,
bool need_escape);

inline bool write_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len,
Expand All @@ -62,6 +68,8 @@ class TextConverter {
}
void set_map_kv_delimiter(char mapkv_delimiter) { _map_kv_delimiter = mapkv_delimiter; }

inline void set_escape_char(const char escape) { this->_escape_char = escape; }

private:
bool _write_data(const TypeDescriptor& type_desc, vectorized::IColumn* nullable_col_ptr,
const char* data, size_t len, bool copy_string, bool need_escape, size_t rows,
Expand Down
6 changes: 6 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,12 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
}
if (!http_req->header(HTTP_ENCLOSE).empty() && http_req->header(HTTP_ENCLOSE).size() > 0) {
request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]);
}
if (!http_req->header(HTTP_ESCAPE).empty() && http_req->header(HTTP_ESCAPE).size() > 0) {
request.__set_escape(http_req->header(HTTP_ESCAPE)[0]);
}
if (!http_req->header(HTTP_PARTITIONS).empty()) {
request.__set_partitions(http_req->header(HTTP_PARTITIONS));
request.__set_isTempPartition(false);
Expand Down
64 changes: 63 additions & 1 deletion be/src/http/ev_http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,40 @@ EvHttpServer::~EvHttpServer() {
}
}

void EvHttpServer::_init_ssl() {
SSL_library_init();
SSL_load_error_strings();
OpenSSL_add_all_algorithms();
m_ctx = SSL_CTX_new(SSLv32_server_method());
SSL_CTX_set_options(m_ctx,
SSL_OP_SINGLE_DH_USE |
SSL_OP_SINGLE_ECDH_USE |
SSL_OP_NO_SSLv2);
m_ecdh = EC_KEY_new_by_curve_name (NID_X9_62_prime256v1);
if (!m_ecdh) {
LOG(WARNING) << "EC_KEY_new_by_curve_name fail";
return;
}

if(1 != SSL_CTX_set_tmp_ecdh(m_ctx, m_ecdh)) {
LOG(WARNING) << "SSL_CTX_set_tmp_ecdh fail";
return;
}
int ret = _server_set_certs();
CHECK(ret >= 0) << "SetCerts failed code=" << ret;
}

static struct bufferevent* bevcb(struct event_base *base, void *arg) {
struct bufferevent* r;
SSL_CTX *ctx = (SSL_CTX *)arg;
r = bufferevent_openssl_socket_new(base,
-1,
SSL_new(ctx),
BUFFEREVENT_SSL_ACCEPTING,
BEV_OPT_CLOSE_ON_FREE);
return r;
}

void EvHttpServer::start() {
_started = true;
// bind to
Expand All @@ -109,6 +143,10 @@ void EvHttpServer::start() {
.build(&_workers);

evthread_use_pthreads();
if (config::enable_https) {
LOG(INFO) << "BE WebServer using https";
_init_ssl();
}
_event_bases.resize(_num_workers);
for (int i = 0; i < _num_workers; ++i) {
CHECK(_workers->submit_func([this, i]() {
Expand All @@ -125,7 +163,9 @@ void EvHttpServer::start() {
std::shared_ptr<evhttp> http(evhttp_new(base.get()),
[](evhttp* http) { evhttp_free(http); });
CHECK(http != nullptr) << "Couldn't create an evhttp.";

if (config::enable_https) {
evhttp_set_bevcb(http.get(), bevcb, m_ctx);
}
auto res = evhttp_accept_socket(http.get(), _server_fd);
CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;

Expand All @@ -138,6 +178,22 @@ void EvHttpServer::start() {
}
}

int EvHttpServer::_server_set_certs() {
if (1 != SSL_CTX_use_certificate_chain_file(m_ctx, config::ssl_certificate_path.c_str())) {
LOG(WARNING) << "SSL_CTX_use_certificate_chain_file fail";
return -1;
}
if (1 != SSL_CTX_use_PrivateKey_file(m_ctx, config::ssl_private_key_path.c_str(), SSL_FILETYPE_PEM)) {
LOG(WARNING) << "SSL_CTX_use_PrivateKey_file fail";
return -2;
}
if (1 != SSL_CTX_check_private_key(m_ctx)) {
LOG(WARNING) << "SSL_CTX_check_private_key fail";
return -3;
}
return 0;
}

void EvHttpServer::stop() {
{
std::lock_guard<std::mutex> lock(_event_bases_lock);
Expand All @@ -146,6 +202,12 @@ void EvHttpServer::stop() {
}
_event_bases.clear();
}
if (m_ctx != NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!= nullptr

SSL_CTX_free(m_ctx);
}
if (m_ecdh != NULL) {
EC_KEY_free(m_ecdh);
}
_workers->shutdown();
close(_server_fd);
_started = false;
Expand Down
11 changes: 10 additions & 1 deletion be/src/http/ev_http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef EVENT__HAVE_OPENSSL
#define EVENT__HAVE_OPENSSL
#endif
#pragma once

#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include <openssl/ssl.h>
#include <event2/bufferevent_ssl.h>

#include "common/status.h"
#include "http/http_method.h"
#include "util/path_trie.hpp"
Expand Down Expand Up @@ -58,6 +63,8 @@ class EvHttpServer {
private:
Status _bind();
HttpHandler* _find_handler(HttpRequest* req);
void _init_ssl();
int _server_set_certs();

private:
// input param
Expand All @@ -81,6 +88,8 @@ class EvHttpServer {
PathTrie<HttpHandler*> _head_handlers;
PathTrie<HttpHandler*> _options_handlers;
bool _started = false;
SSL_CTX* m_ctx;
EC_KEY* m_ecdh;
};

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ static const std::string HTTP_COLUMNS = "columns";
static const std::string HTTP_WHERE = "where";
static const std::string HTTP_COLUMN_SEPARATOR = "column_separator";
static const std::string HTTP_LINE_DELIMITER = "line_delimiter";
static const std::string HTTP_ENCLOSE = "enclose";
static const std::string HTTP_ESCAPE = "escape";
static const std::string HTTP_MAX_FILTER_RATIO = "max_filter_ratio";
static const std::string HTTP_TIMEOUT = "timeout";
static const std::string HTTP_PARTITIONS = "partitions";
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
namespace doris {
using namespace ErrorCode;

SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {}

Status SegcompactionWorker::_get_segcompaction_reader(
SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet,
std::shared_ptr<Schema> schema, OlapReaderStatistics* stat,
Expand Down Expand Up @@ -149,8 +151,8 @@ Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t e
}

Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat,
Merger::Statistics& merger_stat, uint64_t begin,
uint64_t end) {
Merger::Statistics& merger_stat, uint32_t begin,
uint32_t end) {
uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */
uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */
uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */
Expand Down Expand Up @@ -186,7 +188,7 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
}

Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin, uint64_t end) {
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end) {
return _writer->_create_segment_writer_for_segcompaction(writer, begin, end);
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/segcompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class SegcompactionWorker {
friend class BetaRowsetWriter;

public:
SegcompactionWorker(BetaRowsetWriter* writer) { _writer = writer; }
SegcompactionWorker(BetaRowsetWriter* writer);

void compact_segments(SegCompactionCandidatesSharedPtr segments);

Expand All @@ -59,7 +59,7 @@ class SegcompactionWorker {

private:
Status _create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin, uint64_t end);
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end);
Status _get_segcompaction_reader(SegCompactionCandidatesSharedPtr segments,
TabletSharedPtr tablet, std::shared_ptr<Schema> schema,
OlapReaderStatistics* stat,
Expand All @@ -70,7 +70,7 @@ class SegcompactionWorker {
uint32_t end);
Status _delete_original_segments(uint32_t begin, uint32_t end);
Status _check_correctness(OlapReaderStatistics& reader_stat, Merger::Statistics& merger_stat,
uint64_t begin, uint64_t end);
uint32_t begin, uint32_t end);
Status _do_compact_segments(SegCompactionCandidatesSharedPtr segments);

private:
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB&
}
case FieldType::OLAP_FIELD_TYPE_MAP: {
// map reader now has 3 sub readers for key, value, offsets(scalar), null(scala)
std::unique_ptr<ColumnReader> map_reader(
new ColumnReader(opts, meta, num_rows, file_reader));
std::unique_ptr<ColumnReader> key_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(0),
meta.children_columns(0).num_rows(), file_reader,
Expand All @@ -155,6 +153,11 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB&
meta.children_columns(3).num_rows(),
file_reader, &null_reader));
}

// The num rows of the map reader equals to the num rows of the length reader.
num_rows = meta.children_columns(2).num_rows();
std::unique_ptr<ColumnReader> map_reader(
new ColumnReader(opts, meta, num_rows, file_reader));
map_reader->_sub_readers.resize(meta.children_columns_size());

map_reader->_sub_readers[0] = std::move(key_reader);
Expand Down
7 changes: 3 additions & 4 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
_instance_to_sending_by_pipeline[low_id] = true;
_instance_to_rpc_ctx[low_id] = {};
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_time[low_id] = 0;
_construct_request(low_id, finst_id);
Expand Down Expand Up @@ -191,10 +192,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
auto* closure = request.channel->get_closure(id, request.eos, nullptr);

ExchangeRpcContext rpc_ctx;
rpc_ctx._closure = closure;
rpc_ctx.is_cancelled = false;
_instance_to_rpc_ctx[id] = rpc_ctx;
_instance_to_rpc_ctx[id]._closure = closure;
_instance_to_rpc_ctx[id].is_cancelled = false;

closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
if (config::exchange_sink_ignore_eovercrowded) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
_exec_status = Status::Cancelled(msg);
}
_runtime_state->set_is_cancelled(true);
_runtime_state->set_is_cancelled(true, msg);

LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;

Expand All @@ -172,7 +172,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
// For stream load the fragment's query_id == load id, it is set in FE.
auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
if (stream_load_ctx != nullptr) {
stream_load_ctx->pipe->cancel(PPlanFragmentCancelReason_Name(reason));
stream_load_ctx->pipe->cancel(msg);
}
_cancel_reason = reason;
_cancel_msg = msg;
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ Status FragmentExecState::execute() {
strings::Substitute("Got error while opening fragment $0, query id: $1",
print_id(_fragment_instance_id), print_id(_query_id)));
if (!st.ok()) {
cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "PlanFragmentExecutor open failed");
cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
fmt::format("PlanFragmentExecutor open failed, reason: {}", st.to_string()));
}
_executor.close();
}
Expand All @@ -287,7 +288,7 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const
// For stream load the fragment's query_id == load id, it is set in FE.
auto stream_load_ctx = _query_ctx->exec_env()->new_load_stream_mgr()->get(_query_id);
if (stream_load_ctx != nullptr) {
stream_load_ctx->pipe->cancel(PPlanFragmentCancelReason_Name(reason));
stream_load_ctx->pipe->cancel(msg);
}
#endif
_cancelled = true;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const
DCHECK(_prepared);
_cancel_reason = reason;
_cancel_msg = msg;
_runtime_state->set_is_cancelled(true);
_runtime_state->set_is_cancelled(true, msg);
// To notify wait_for_start()
_runtime_state->get_query_ctx()->set_ready_to_execute(true);

Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ class RuntimeState {

bool is_cancelled() const { return _is_cancelled.load(); }
int codegen_level() const { return _query_options.codegen_level; }
void set_is_cancelled(bool v) {
void set_is_cancelled(bool v, std::string msg) {
_is_cancelled.store(v);
// Create a error status, so that we could print error stack, and
// we could know which path call cancel.
LOG(INFO) << "task is cancelled, st = " << Status::Error<ErrorCode::CANCELLED>("");
LOG(INFO) << "task is cancelled, st = " << Status::Error<ErrorCode::CANCELLED>(msg);
}

void set_backend_id(int64_t backend_id) { _backend_id = backend_id; }
Expand Down
1 change: 1 addition & 0 deletions be/src/service/brpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Status BRpcService::start(int port, int num_threads) {
}

if (config::enable_https) {
LOG(INFO) << "BE brpc open ssl";
auto sslOptions = options.mutable_ssl_options();
sslOptions->default_cert.certificate = config::ssl_certificate_path;
sslOptions->default_cert.private_key = config::ssl_private_key_path;
Expand Down
Loading