Skip to content

Commit

Permalink
Merge 0f50420 into 7cba719
Browse files Browse the repository at this point in the history
  • Loading branch information
rbost committed Dec 21, 2018
2 parents 7cba719 + 0f50420 commit 2b1b306
Show file tree
Hide file tree
Showing 39 changed files with 552 additions and 971 deletions.
10 changes: 6 additions & 4 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,18 @@ IncludeCategories:
Priority: 3
- Regex: '^<(grpc|grpc\+\+|google/protobuf)/.*\.(h|hpp)'
Priority: 4
- Regex: '^<spdlog/.*\.(h|hpp)'
Priority: 4
- Regex: '^(<|")(gtest)/.*\.(h|hpp)'
Priority: 7
- Regex: '^<.*\.h>'
Priority: 5
- Regex: '^<(chrono|condition_variable)>'
Priority: 6
- Regex: '^<c.*>'
Priority: 5
- Regex: '^<.*'
Priority: 6
- Regex: '^<(chrono|condition_variable)>'
Priority: 6
- Regex: '^(<|")(gtest)/.*\.(h|hpp)'
Priority: 7
- Regex: '.*'
Priority: 8
IncludeIsMainRegex: '([-_](test|unittest))?$'
Expand Down
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@
[submodule "externals/sanitizers-cmake"]
path = externals/sanitizers-cmake
url = https://github.com/arsenm/sanitizers-cmake.git
[submodule "externals/spdlog"]
path = externals/spdlog
url = https://github.com/gabime/spdlog.git
branch = v1.2.1
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ find_package(GRPC)
message(STATUS "Protoc path: " ${Protobuf_PROTOC_EXECUTABLE})
message(STATUS "GRPC plugin path: " ${GRPC_CPP_PLUGIN})

# Import spdlog
add_subdirectory(externals/spdlog)

# Disable memory locks because of race conditions.
# Disable memory locks because of race conditions.
set(ENABLE_MEMORY_LOCK OFF CACHE BOOL "Disable Memory Lock" FORCE)
add_subdirectory(third_party/crypto/src)

Expand Down
1 change: 1 addition & 0 deletions externals/spdlog
Submodule spdlog added at 10e809
2 changes: 2 additions & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ target_link_libraries(
OpenSSE::crypto
OpenSSE::dbparser
${ROCKSDB_LIBRARIES}
spdlog::spdlog
)

set(PROTOS
Expand Down Expand Up @@ -117,4 +118,5 @@ target_link_libraries(
OpenSSE::schemes
gRPC::grpc++
protobuf::libprotobuf
spdlog::spdlog
)
74 changes: 25 additions & 49 deletions lib/diana/client_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include <grpc/grpc.h>

#include <chrono>

#include <fstream>
#include <iostream>
#include <memory>
Expand Down Expand Up @@ -219,13 +218,10 @@ bool DianaClientRunner::send_setup() const
grpc::Status status = stub_->setup(&context, message, &e);

if (status.ok()) {
logger::log(logger::LoggerSeverity::TRACE)
<< "Setup succeeded." << std::endl;
logger::logger()->info("Server setup succeeded.");
} else {
logger::log(logger::LoggerSeverity::ERROR)
<< "Setup failed: " << std::endl;
logger::log(logger::LoggerSeverity::ERROR)
<< status.error_message() << std::endl;
logger::logger()->error("Server setup failed: \n"
+ status.error_message());
return false;
}

Expand All @@ -245,8 +241,7 @@ std::list<uint64_t> DianaClientRunner::search(
const std::string& keyword,
const std::function<void(uint64_t)>& receive_callback) const
{
logger::log(logger::LoggerSeverity::TRACE)
<< "Search " << keyword << std::endl;
logger::logger()->trace("Searching keyword: " + keyword);

grpc::ClientContext context;
SearchRequestMessage message;
Expand All @@ -264,9 +259,6 @@ std::list<uint64_t> DianaClientRunner::search(


while (reader->Read(&reply)) {
// logger::log(logger::LoggerSeverity::TRACE) << "New result
// received: "
// << std::dec << reply.result() << std::endl;
results.push_back(reply.result());

if (receive_callback != nullptr) {
Expand All @@ -275,13 +267,9 @@ std::list<uint64_t> DianaClientRunner::search(
}
grpc::Status status = reader->Finish();
if (status.ok()) {
logger::log(logger::LoggerSeverity::TRACE)
<< "Search succeeded." << std::endl;
logger::logger()->trace("Search succeeded.");
} else {
logger::log(logger::LoggerSeverity::ERROR)
<< "Search failed:" << std::endl;
logger::log(logger::LoggerSeverity::ERROR)
<< status.error_message() << std::endl;
logger::logger()->error("Search failed: \n" + status.error_message());
}

return results;
Expand All @@ -303,13 +291,10 @@ void DianaClientRunner::insert(const std::string& keyword, uint64_t index)
grpc::Status status = stub_->insert(&context, message, &e);

if (status.ok()) {
logger::log(logger::LoggerSeverity::TRACE)
<< "Update succeeded." << std::endl;
logger::logger()->trace("Update succeeded.");
} else {
logger::log(logger::LoggerSeverity::ERROR)
<< "Update failed:" << std::endl;
logger::log(logger::LoggerSeverity::ERROR)
<< status.error_message() << std::endl;
logger::logger()->error("Update failed:\n"
+ status.error_message());
}
}
}
Expand All @@ -326,8 +311,7 @@ void DianaClientRunner::insert_in_session(const std::string& keyword,

bulk_update_state_.mtx.lock();
if (!bulk_update_state_.writer->Write(message)) {
logger::log(logger::LoggerSeverity::ERROR)
<< "Update session: broken stream." << std::endl;
logger::logger()->error("Update session stopped: broken stream.");
}
bulk_update_state_.mtx.unlock();
}
Expand All @@ -347,8 +331,7 @@ void DianaClientRunner::insert_in_session(

for (auto& it : message_list) {
if (!bulk_update_state_.writer->Write(request_to_message(it))) {
logger::log(logger::LoggerSeverity::ERROR)
<< "Update session: broken stream." << std::endl;
logger::logger()->error("Update session stopped: broken stream.");
break;
}
}
Expand All @@ -358,9 +341,8 @@ void DianaClientRunner::insert_in_session(
void DianaClientRunner::start_update_session()
{
if (bulk_update_state_.writer) {
logger::log(logger::LoggerSeverity::WARNING)
<< "Invalid client state: the bulk update session is already up"
<< std::endl;
logger::logger()->warn(
"Invalid client state: the bulk update session is already up");
return;
}

Expand All @@ -369,34 +351,31 @@ void DianaClientRunner::start_update_session()
bulk_update_state_.context.get(), &(bulk_update_state_.response));
bulk_update_state_.is_up = true;

logger::log(logger::LoggerSeverity::TRACE)
<< "Update session started." << std::endl;
logger::logger()->trace("Update session started.");
}

void DianaClientRunner::end_update_session()
{
if (!bulk_update_state_.writer) {
logger::log(logger::LoggerSeverity::WARNING)
<< "Invalid client state: the bulk update session is not up"
<< std::endl;
logger::logger()->warn(
"Invalid client state: the bulk update session is not up");
return;
}

bulk_update_state_.writer->WritesDone();
::grpc::Status status = bulk_update_state_.writer->Finish();

if (!status.ok()) {
logger::log(logger::LoggerSeverity::ERROR)
<< "Status not OK at the end of update sessions. Status: "
<< status.error_message() << std::endl;
logger::logger()->error(
"Status not OK at the end of update sessions. Status: \n"
+ status.error_message());
}

bulk_update_state_.is_up = false;
bulk_update_state_.context.reset();
bulk_update_state_.writer.reset();

logger::log(logger::LoggerSeverity::TRACE)
<< "Update session terminated." << std::endl;
logger::logger()->trace("Update session terminated.");
}


Expand Down Expand Up @@ -427,9 +406,8 @@ bool DianaClientRunner::load_inverted_index(const std::string& path)
counter++;

if ((counter % 100) == 0) {
logger::log(sse::logger::LoggerSeverity::INFO)
<< "\rLoading: " << counter << " keywords processed"
<< std::flush;
logger::logger()->info("Loading: {} keywords processed",
counter);
}
};
pool.enqueue(work, kw, docs);
Expand All @@ -443,16 +421,14 @@ bool DianaClientRunner::load_inverted_index(const std::string& path)
parser.parse();

pool.join();
logger::log(sse::logger::LoggerSeverity::INFO)
<< "\rLoading: " << counter << " keywords processed" << std::endl;
logger::logger()->info("Loading: {} keywords processed", counter);

end_update_session();

return true;
} catch (std::exception& e) {
logger::log(logger::LoggerSeverity::ERROR)
<< "\nFailed to load file " << path << " : " << e.what()
<< std::endl;
logger::logger()->error("Failed to load file " + path + ": \n"
+ e.what());
return false;
}
return false;
Expand Down
Loading

0 comments on commit 2b1b306

Please sign in to comment.