Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <filesystem>

#include "common/status.h"
#include "io/fs/stream_sink_file_writer.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
#include "olap/rowset/segment_v2/inverted_index_reader.h"
Expand Down Expand Up @@ -124,6 +125,10 @@ Status InvertedIndexFileWriter::close() {
DCHECK(!_closed) << debug_string();
_closed = true;
if (_indices_dirs.empty()) {
// An empty file must still be created even if there are no indexes to write
if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr) {
return _idx_v2_writer->close();
}
return Status::OK();
}
DBUG_EXECUTE_IF("inverted_index_storage_format_must_be_v2", {
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) {
LOG(INFO) << "file " << segid << " path " << file_writer->path().native() << "closed, written "
<< file_writer->bytes_appended() << " bytes"
<< ", file type is " << file_type;
if (file_writer->bytes_appended() == 0) {
// ‌Allow the index file to be empty when creating an index on a variant-type column.
if (file_writer->bytes_appended() == 0 && file_type != FileType::INVERTED_INDEX_FILE) {
return Status::Corruption("file {} closed with 0 bytes, file type is {}",
file_writer->path().native(), file_type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <brpc/channel.h>
#include <brpc/server.h>

#include "gtest/gtest_pred_impl.h"
#include "io/fs/stream_sink_file_writer.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/inverted_index_file_writer.h"
#include "vec/sink/load_stream_stub.h"

namespace doris {

constexpr int64_t LOAD_ID_LO = 1;
constexpr int64_t LOAD_ID_HI = 2;
constexpr int64_t NUM_STREAM = 3;
constexpr static std::string_view tmp_dir = "./ut_dir/tmp";
class EmptyIndexFileTest : public testing::Test {
class MockStreamStub : public LoadStreamStub {
public:
MockStreamStub(PUniqueId load_id, int64_t src_id)
: LoadStreamStub(load_id, src_id, std::make_shared<IndexToTabletSchema>(),
std::make_shared<IndexToEnableMoW>()) {};

virtual ~MockStreamStub() = default;

// APPEND_DATA
virtual Status append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t segment_id, uint64_t offset, std::span<const Slice> data,
bool segment_eos = false,
FileType file_type = FileType::SEGMENT_FILE) override {
EXPECT_TRUE(segment_eos);
return Status::OK();
}
};

public:
EmptyIndexFileTest() = default;
~EmptyIndexFileTest() = default;

protected:
virtual void SetUp() {
_load_id.set_hi(LOAD_ID_HI);
_load_id.set_lo(LOAD_ID_LO);
for (int src_id = 0; src_id < NUM_STREAM; src_id++) {
_streams.emplace_back(new MockStreamStub(_load_id, src_id));
}
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok());
std::vector<StorePath> paths;
paths.emplace_back(std::string(tmp_dir), 1024000000);
auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths);
EXPECT_TRUE(tmp_file_dirs->init().ok());
ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs));
}

virtual void TearDown() {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
}

PUniqueId _load_id;
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
};

TEST_F(EmptyIndexFileTest, test_empty_index_file) {
io::FileWriterPtr file_writer = std::make_unique<io::StreamSinkFileWriter>(_streams);
auto fs = io::global_local_filesystem();
std::string index_path = "/tmp/empty_index_file_test";
std::string rowset_id = "1234567890";
int64_t seg_id = 1234567890;
auto index_file_writer = std::make_unique<segment_v2::InvertedIndexFileWriter>(
fs, index_path, rowset_id, seg_id, InvertedIndexStorageFormatPB::V2,
std::move(file_writer));
EXPECT_TRUE(index_file_writer->close().ok());
}

} // namespace doris
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 "abcd"

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_variant_empty_index_file", "p0") {
def tableName = "test_variant_empty_index_file"
sql """ drop table if exists ${tableName} """
// create table
sql """
CREATE TABLE IF NOT EXISTS ${tableName}
(
`id` bigint NOT NULL,
`v` variant NULL,
INDEX v_idx (`v`) USING INVERTED
) DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH (`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"inverted_index_storage_format" = "v2",
"disable_auto_compaction" = "true"
);
"""

sql """ set enable_memtable_on_sink_node = true """
sql """ insert into ${tableName} values (1, 'abcd') """

def tablets = sql_return_maparray """ show tablets from ${tableName}; """

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

String tablet_id = tablets[0].TabletId
String backend_id = tablets[0].BackendId
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
if (!isCloudMode()) {
def (code, out, err) = http_client("GET", String.format("http://%s:%s/api/show_nested_index_file?tablet_id=%s", ip, port, tablet_id))
logger.info("Run show_nested_index_file_on_tablet: code=" + code + ", out=" + out + ", err=" + err)
assertEquals("E-6002", parseJson(out.trim()).status)
}

qt_sql """ select * from ${tableName} where v match 'abcd'"""
}
Loading