Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flex): Implemented compaction. #3482

Merged
merged 8 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ option(BUILD_DOC "Whether to build doc" ON)
option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" ON)
option(MONITOR_SESSIONS "Whether monitor sessions" OFF)


# ------------------------------------------------------------------------------
# cmake configs
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -55,16 +54,6 @@ if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
"Debug" "Release" "MinSizeRel" "RelWithDebInfo")
endif ()

if (APPLE)
set(CMAKE_MACOSX_RPATH ON)
else ()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wl,-rpath,$ORIGIN")
endif ()

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall -fPIC")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g")

add_compile_definitions(FLEX_VERSION="${FLEX_VERSION}")

if (APPLE)
Expand All @@ -77,6 +66,10 @@ else ()
endif ()
endif ()

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall -fPIC")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g")

find_package(MPI REQUIRED)
include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH})

Expand Down
2 changes: 1 addition & 1 deletion flex/bin/rt_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ int main(int argc, char** argv) {
auto& db = gs::GraphDB::get();

auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
db.Open(schema, data_path, shard_num, warmup, memory_only);
db.Open(schema, data_path, shard_num, warmup, memory_only, true, http_port);

t0 += grape::GetCurrentTime();

Expand Down
1 change: 1 addition & 0 deletions flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
${CMAKE_CURRENT_SOURCE_DIR}/database/single_edge_insert_transaction.h
${CMAKE_CURRENT_SOURCE_DIR}/database/single_vertex_insert_transaction.h
${CMAKE_CURRENT_SOURCE_DIR}/database/update_transaction.h
${CMAKE_CURRENT_SOURCE_DIR}/database/compact_transaction.h
${CMAKE_CURRENT_SOURCE_DIR}/database/version_manager.h
${CMAKE_CURRENT_SOURCE_DIR}/database/wal.h
${CMAKE_CURRENT_SOURCE_DIR}/database/transaction_utils.h
Expand Down
8 changes: 8 additions & 0 deletions flex/engines/graph_db/app/server_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ bool ServerApp::Query(Decoder& input, Encoder& output) {
}
return true;
}
} else if (op == "COMPACTION") {
bool ret = graph_.Compact();
if (ret) {
output.put_string("SUCCESS");
} else {
output.put_string("ABORTED");
}
return true;
}
return false;
}
Expand Down
61 changes: 61 additions & 0 deletions flex/engines/graph_db/database/compact_transaction.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "flex/engines/graph_db/database/compact_transaction.h"
#include "flex/engines/graph_db/database/version_manager.h"
#include "flex/engines/graph_db/database/wal.h"
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"

namespace gs {

CompactTransaction::CompactTransaction(MutablePropertyFragment& graph,
WalWriter& logger, VersionManager& vm,
timestamp_t timestamp)
: graph_(graph), logger_(logger), vm_(vm), timestamp_(timestamp) {
arc_.Resize(sizeof(WalHeader));
}

CompactTransaction::~CompactTransaction() { Abort(); }

timestamp_t CompactTransaction::timestamp() const { return timestamp_; }

void CompactTransaction::Commit() {
if (timestamp_ != std::numeric_limits<timestamp_t>::max()) {
auto* header = reinterpret_cast<WalHeader*>(arc_.GetBuffer());
header->length = 0;
header->timestamp = timestamp_;
header->type = 1;

logger_.append(arc_.GetBuffer(), arc_.GetSize());
arc_.Clear();

LOG(INFO) << "before compact - " << timestamp_;
graph_.Compact(timestamp_);
LOG(INFO) << "after compact - " << timestamp_;

vm_.release_update_timestamp(timestamp_);
timestamp_ = std::numeric_limits<timestamp_t>::max();
}
}

void CompactTransaction::Abort() {
if (timestamp_ != std::numeric_limits<timestamp_t>::max()) {
arc_.Clear();
vm_.revert_update_timestamp(timestamp_);
timestamp_ = std::numeric_limits<timestamp_t>::max();
}
}

} // namespace gs
51 changes: 51 additions & 0 deletions flex/engines/graph_db/database/compact_transaction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef GRAPHSCOPE_DATABASE_COMPACT_TRANSACTION_H_
#define GRAPHSCOPE_DATABASE_COMPACT_TRANSACTION_H_

#include "flex/storages/rt_mutable_graph/types.h"
#include "grape/serialization/in_archive.h"

namespace gs {

class MutablePropertyFragment;
class WalWriter;
class VersionManager;

class CompactTransaction {
public:
CompactTransaction(MutablePropertyFragment& graph, WalWriter& logger,
VersionManager& vm, timestamp_t timestamp);
~CompactTransaction();

timestamp_t timestamp() const;

void Commit();

void Abort();

private:
MutablePropertyFragment& graph_;
WalWriter& logger_;
VersionManager& vm_;
timestamp_t timestamp_;

grape::InArchive arc_;
};

} // namespace gs

#endif // GRAPHSCOPE_DATABASE_COMPACT_TRANSACTION_H_
80 changes: 77 additions & 3 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "flex/engines/graph_db/database/wal.h"
#include "flex/utils/yaml_utils.h"

#include "flex/third_party/httplib.h"

namespace gs {

struct SessionLocalContext {
Expand All @@ -41,6 +43,10 @@

GraphDB::GraphDB() = default;
GraphDB::~GraphDB() {
if (compact_thread_running_) {
compact_thread_running_ = false;
compact_thread_.join();
}
for (int i = 0; i < thread_num_; ++i) {
contexts_[i].~SessionLocalContext();
}
Expand All @@ -52,64 +58,112 @@
return db;
}

Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
int32_t thread_num, bool warmup, bool memory_only) {
int32_t thread_num, bool warmup, bool memory_only,
bool enable_auto_compaction, int port) {
if (!std::filesystem::exists(data_dir)) {
std::filesystem::create_directories(data_dir);
}

std::string schema_file = schema_path(data_dir);
bool create_empty_graph = false;
if (!std::filesystem::exists(schema_file)) {
create_empty_graph = true;
graph_.mutable_schema() = schema;
}
work_dir_ = data_dir;
thread_num_ = thread_num;
try {
graph_.Open(data_dir, memory_only);
} catch (std::exception& e) {
LOG(ERROR) << "Exception: " << e.what();
return Result<bool>(StatusCode::InternalError,
"Exception: " + std::string(e.what()), false);
}

if ((!create_empty_graph) && (!graph_.schema().Equals(schema))) {
LOG(ERROR) << "Schema inconsistent..\n";
return Result<bool>(StatusCode::InternalError,
"Schema of work directory is not compatible with the "
"graph schema",
false);
}
// Set the plugin info from schema to graph_.schema(), since the plugin info
// is not serialized and deserialized.
auto& mutable_schema = graph_.mutable_schema();
mutable_schema.SetPluginDir(schema.GetPluginDir());
std::vector<std::string> plugin_paths;
const auto& plugins = schema.GetPlugins();
for (auto plugin_pair : plugins) {
plugin_paths.emplace_back(plugin_pair.first);
}

std::sort(plugin_paths.begin(), plugin_paths.end(),
[&](const std::string& a, const std::string& b) {
return plugins.at(a).second < plugins.at(b).second;
});
mutable_schema.EmplacePlugins(plugin_paths);

last_compaction_ts_ = 0;
openWalAndCreateContexts(data_dir, memory_only);

if ((!create_empty_graph) && warmup) {
graph_.Warmup(thread_num_);
}

if (enable_auto_compaction && (port != -1)) {
if (compact_thread_running_) {
compact_thread_running_ = false;
compact_thread_.join();
}
compact_thread_running_ = true;
compact_thread_ = std::thread([&](int http_port) {
size_t last_compaction_at = 0;
while (compact_thread_running_) {
size_t query_num_before = getExecutedQueryNum();
sleep(30);
if (!compact_thread_running_) {
break;
}
size_t query_num_after = getExecutedQueryNum();
if (query_num_before == query_num_after &&
(query_num_after > (last_compaction_at + 100000))) {
VLOG(10) << "Trigger auto compaction";
last_compaction_at = query_num_after;
std::string url = "127.0.0.1";
httplib::Client cli(url, http_port);
cli.set_connection_timeout(0, 300000);
cli.set_read_timeout(300, 0);
cli.set_write_timeout(300, 0);

std::vector<char> buf;
Encoder encoder(buf);
encoder.put_string("COMPACTION");
encoder.put_byte(0);
std::string content(buf.data(), buf.size());
auto res = cli.Post("/interactive/query", content, "text/plain");
std::string ret = res->body;
Decoder decoder(ret.data(), ret.size());
std::string_view info = decoder.get_string();

VLOG(10) << "Finish compaction, info: " << info;
}
}
}, port);
}

return Result<bool>(true);
}

Check notice on line 157 in flex/engines/graph_db/database/graph_db.cc

View check run for this annotation

codefactor.io / CodeFactor

flex/engines/graph_db/database/graph_db.cc#L61-L157

Complex Method
void GraphDB::Close() {
#ifdef MONITOR_SESSIONS
monitor_thread_running_ = false;
monitor_thread_.join();
#endif
if (compact_thread_running_) {
compact_thread_running_ = false;
compact_thread_.join();
}
//-----------Clear graph_db----------------
graph_.Clear();
version_manager_.clear();
Expand Down Expand Up @@ -152,6 +206,13 @@

int GraphDB::SessionNum() const { return thread_num_; }

void GraphDB::UpdateCompactionTimestamp(timestamp_t ts) {
last_compaction_ts_ = ts;
}
timestamp_t GraphDB::GetLastCompactionTimestamp() const {
return last_compaction_ts_;
}

const MutablePropertyFragment& GraphDB::graph() const { return graph_; }
MutablePropertyFragment& GraphDB::graph() { return graph_; }

Expand Down Expand Up @@ -236,8 +297,13 @@
if (from_ts < to_ts) {
IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num);
}
UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr,
update_wal.size, contexts_[0].allocator);
if (update_wal.size == 0) {
graph_.Compact(update_wal.timestamp);
last_compaction_ts_ = update_wal.timestamp;
} else {
UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr,
update_wal.size, contexts_[0].allocator);
}
from_ts = to_ts + 1;
}
if (from_ts <= parser.last_ts()) {
Expand Down Expand Up @@ -353,4 +419,12 @@
#endif
}

size_t GraphDB::getExecutedQueryNum() const {
size_t ret = 0;
for (int i = 0; i < thread_num_; ++i) {
ret += contexts_[i].session.query_num();
}
return ret;
}

} // namespace gs
12 changes: 11 additions & 1 deletion flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class GraphDB {
*/
Result<bool> Open(const Schema& schema, const std::string& data_dir,
int32_t thread_num = 1, bool warmup = false,
bool memory_only = true);
bool memory_only = true,
bool enable_auto_compaction = false, int port = -1);

/**
* @brief Close the current opened graph.
Expand Down Expand Up @@ -115,6 +116,9 @@ class GraphDB {

int SessionNum() const;

void UpdateCompactionTimestamp(timestamp_t ts);
timestamp_t GetLastCompactionTimestamp() const;

private:
bool registerApp(const std::string& path, uint8_t index = 0);

Expand All @@ -128,6 +132,8 @@ class GraphDB {
void openWalAndCreateContexts(const std::string& data_dir_path,
bool memory_only);

size_t getExecutedQueryNum() const;

friend class GraphDBSession;

std::string work_dir_;
Expand All @@ -145,6 +151,10 @@ class GraphDB {
std::thread monitor_thread_;
bool monitor_thread_running_;
#endif

timestamp_t last_compaction_ts_;
bool compact_thread_running_ = false;
std::thread compact_thread_;
};

} // namespace gs
Expand Down
Loading
Loading