Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[index] change to configuration parameters for index, use merge policies #27

Merged
merged 6 commits into from
Jan 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions keyvi/include/keyvi/index/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ namespace index {
class Index final : public internal::BaseIndexReader<internal::IndexWriterWorker> {
public:
explicit Index(const std::string& index_directory,
const std::chrono::milliseconds& flush_interval = std::chrono::milliseconds(1000))
: BaseIndexReader(index_directory, flush_interval), lock_file_() {
const keyvi::util::parameters_t& params = keyvi::util::parameters_t())
: BaseIndexReader(index_directory, params), lock_file_() {
index_directory_ = index_directory;

index_toc_file_ = index_directory_;
Expand Down
32 changes: 32 additions & 0 deletions keyvi/include/keyvi/index/internal/constants.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/* * keyvi - A key value store.
*
* Copyright 2018 Hendrik Muhs<hendrik.muhs@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* constants.h
*
* Created on: Jan 14, 2018
* Author: hendrik
*/

#ifndef KEYVI_INDEX_INTERNAL_CONSTANTS_H_
#define KEYVI_INDEX_INTERNAL_CONSTANTS_H_

static const char INDEX_REFRESH_INTERVAL[] = "refresh_interval";
static const char MERGE_POLICY[] = "merge_policy";
static const char DEFAULT_MERGE_POLICY[] = "simple";

#endif // KEYVI_INDEX_INTERNAL_CONSTANTS_H_
16 changes: 11 additions & 5 deletions keyvi/include/keyvi/index/internal/index_reader_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
#include "dictionary/dictionary.h"
#include "dictionary/fsa/internal/serialization_utils.h"
#include "dictionary/match.h"
#include "index/internal/constants.h"
#include "index/internal/segment.h"
#include "util/configuration.h"

// #define ENABLE_TRACING
#include "dictionary/util/trace.h"
Expand All @@ -54,9 +56,12 @@ namespace internal {

class IndexReaderWorker final {
public:
explicit IndexReaderWorker(const std::string index_directory,
size_t refresh_interval = 1 /*, optional external logger*/)
: index_toc_(), segments_(), stop_update_thread_(true) {
explicit IndexReaderWorker(const std::string index_directory, const keyvi::util::parameters_t& params)
: index_toc_(),
segments_(),
refresh_interval_(
std::chrono::milliseconds(keyvi::util::mapGet<uint64_t>(params, INDEX_REFRESH_INTERVAL, 1000))),
stop_update_thread_(true) {
index_directory_ = index_directory;

index_toc_file_ = index_directory_;
Expand Down Expand Up @@ -102,6 +107,7 @@ class IndexReaderWorker final {
std::time_t last_modification_time_;
boost::property_tree::ptree index_toc_;
segments_t segments_;
std::chrono::milliseconds refresh_interval_;
std::thread update_thread_;
std::atomic_bool stop_update_thread_;

Expand Down Expand Up @@ -159,8 +165,8 @@ class IndexReaderWorker final {
// reload
ReloadIndex();

// sleep for some time
std::this_thread::sleep_for(std::chrono::milliseconds(500));
// sleep for next refresh
std::this_thread::sleep_for(refresh_interval_);
}
}
};
Expand Down
60 changes: 41 additions & 19 deletions keyvi/include/keyvi/index/internal/index_writer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@

#include "dictionary/dictionary_compiler.h"
#include "dictionary/dictionary_types.h"
#include "index/internal/constants.h"
#include "index/internal/merge_job.h"
#include "index/internal/merge_policy_selector.h"
#include "index/internal/segment.h"
#include "util/active_object.h"
#include "util/configuration.h"
Expand All @@ -55,14 +57,15 @@ namespace internal {
class IndexWriterWorker final {
typedef std::shared_ptr<dictionary::JsonDictionaryCompilerSmallData> compiler_t;
struct IndexPayload {
explicit IndexPayload(const std::string& index_directory, const std::chrono::milliseconds& flush_interval)
explicit IndexPayload(const std::string& index_directory, const std::chrono::milliseconds& refresh_interval)
: compiler_(),
write_counter_(0),
segments_(),
index_directory_(index_directory),
merge_jobs_(),
last_flush_(),
flush_interval_(flush_interval),
refresh_interval_(refresh_interval),
any_delete_(false),
merge_enabled_(true) {
segments_ = std::make_shared<segment_vec_t>();
}
Expand All @@ -73,18 +76,22 @@ class IndexWriterWorker final {
boost::filesystem::path index_directory_;
std::list<MergeJob> merge_jobs_;
std::chrono::system_clock::time_point last_flush_;
std::chrono::duration<double> flush_interval_;
std::chrono::milliseconds refresh_interval_;
size_t max_concurrent_merges_ = 2;
bool any_delete_;
std::atomic_bool merge_enabled_;
};

public:
explicit IndexWriterWorker(const std::string& index_directory,
const std::chrono::milliseconds& flush_interval = std::chrono::milliseconds(1000))
: payload_(index_directory, flush_interval),
compiler_active_object_(&payload_, std::bind(&index::internal::IndexWriterWorker::ScheduledTask, this),
flush_interval) {
explicit IndexWriterWorker(const std::string& index_directory, const keyvi::util::parameters_t& params)
: payload_(index_directory,
std::chrono::milliseconds(keyvi::util::mapGet<uint64_t>(params, INDEX_REFRESH_INTERVAL, 1000))),
compiler_active_object_(
&payload_, std::bind(&index::internal::IndexWriterWorker::ScheduledTask, this),
std::chrono::milliseconds(keyvi::util::mapGet<uint64_t>(params, INDEX_REFRESH_INTERVAL, 1000))) {
TRACE("construct worker: %s", payload_.index_directory_.c_str());

merge_policy_.reset(merge_policy(keyvi::util::mapGet<std::string>(params, MERGE_POLICY, DEFAULT_MERGE_POLICY)));
}

IndexWriterWorker& operator=(IndexWriterWorker const&) = delete;
Expand Down Expand Up @@ -128,6 +135,7 @@ class IndexWriterWorker final {

void Delete(const std::string& key) {
compiler_active_object_([&key](IndexPayload& payload) {
payload.any_delete_ = true;
TRACE("delete key %s", key.c_str());

if (payload.compiler_) {
Expand Down Expand Up @@ -155,7 +163,11 @@ class IndexWriterWorker final {
*/
void Flush(bool async = true) {
TRACE("flush");
compiler_active_object_([](IndexPayload& payload) { Compile(&payload); });

compiler_active_object_([](IndexPayload& payload) {
PersistDeletes(&payload);
Compile(&payload);
});

if (async == false) {
std::mutex m;
Expand All @@ -173,6 +185,7 @@ class IndexWriterWorker final {

private:
IndexPayload payload_;
std::unique_ptr<MergePolicy> merge_policy_;
util::ActiveObject<IndexPayload> compiler_active_object_;

void ScheduledTask() {
Expand All @@ -187,7 +200,7 @@ class IndexWriterWorker final {
}

auto tp = std::chrono::system_clock::now();
if (tp - payload_.last_flush_ > payload_.flush_interval_) {
if (tp - payload_.last_flush_ > payload_.refresh_interval_) {
Compile(&payload_);
payload_.last_flush_ = tp;
}
Expand All @@ -203,6 +216,9 @@ class IndexWriterWorker final {
for (MergeJob& p : payload_.merge_jobs_) {
if (p.TryFinalize()) {
if (p.Successful()) {
// let the merge policy know that id is done
merge_policy_->MergeFinished(p.GetId());

TRACE("rewriting segment list");
any_merge_finalized = true;

Expand Down Expand Up @@ -274,13 +290,8 @@ class IndexWriterWorker final {
return;
}

std::vector<segment_t> to_merge;
for (segment_t& s : (*payload_.segments_)) {
if (!s->MarkedForMerge()) {
TRACE("Add to merge list %s", s->GetFilename().c_str());
to_merge.push_back(s);
}
}
size_t merge_policy_id = 0;
std::vector<segment_t> to_merge = merge_policy_->SelectMergeSegments(payload_.segments_, &merge_policy_id);

if (to_merge.size() < 1) {
return;
Expand All @@ -294,11 +305,22 @@ class IndexWriterWorker final {
s->MarkMerge();
}

// todo: add id
payload_.merge_jobs_.emplace_back(to_merge, 0, p);
payload_.merge_jobs_.emplace_back(to_merge, merge_policy_id, p);
payload_.merge_jobs_.back().Run();
}

static inline void PersistDeletes(IndexPayload* payload) {
// only loop through segments if any delete has happened
if (payload->any_delete_) {
for (segment_t& s : *payload->segments_) {
s->Persist();
}
}

// clear delete flag
payload->any_delete_ = false;
}

static inline void Compile(IndexPayload* payload) {
if (!payload->compiler_) {
TRACE("no compiler found");
Expand Down
2 changes: 2 additions & 0 deletions keyvi/include/keyvi/index/internal/merge_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class MergeJob final {

const bool Merged() const { return payload_.merge_done; }

size_t GetId() const { return id_; }

// todo: ability to kill job/process

private:
Expand Down
3 changes: 1 addition & 2 deletions keyvi/include/keyvi/index/internal/merge_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include <vector>

#include "index/internal/merge_job.h"
#include "index/internal/segment.h"

namespace keyvi {
Expand All @@ -36,7 +35,7 @@ class MergePolicy {

virtual void MergeFinished(const size_t id) = 0;

virtual std::vector<segment_t> SelectMergeSegments(const std::vector<segment_t>& segments, size_t* id) = 0;
virtual std::vector<segment_t> SelectMergeSegments(const segments_t& segments, size_t* id) = 0;
};

} /* namespace internal */
Expand Down
52 changes: 52 additions & 0 deletions keyvi/include/keyvi/index/internal/merge_policy_selector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/* * keyvi - A key value store.
*
* Copyright 2018 Hendrik Muhs<hendrik.muhs@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* merge_policy_selector.h
*
* Created on: Jan 14, 2018
* Author: hendrik
*/

#ifndef KEYVI_INDEX_INTERNAL_MERGE_POLICY_SELECTOR_H_
#define KEYVI_INDEX_INTERNAL_MERGE_POLICY_SELECTOR_H_

#include <string>

#include "index/internal/merge_policy.h"
#include "index/internal/simple_merge_policy.h"

namespace keyvi {
namespace index {
namespace internal {

inline MergePolicy* merge_policy(const std::string& name = "") throw(std::invalid_argument) {
auto lower_name = name;

boost::algorithm::to_lower(lower_name);
if (lower_name == "simple") {
return new SimpleMergePolicy();
} else {
throw std::invalid_argument(name + " is not a valid merge policy");
}
}

} /* namespace internal */
} /* namespace index */
} /* namespace keyvi */

#endif /* KEYVI_INDEX_INTERNAL_MERGE_POLICY_SELECTOR_H_ */
46 changes: 37 additions & 9 deletions keyvi/include/keyvi/index/internal/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ namespace internal {
class Segment final {
public:
explicit Segment(const boost::filesystem::path& path, const bool load = true)
: path_(path), filename_(path.filename().string()), deleted_keys_(), dictionary_(), in_merge_(false) {
: path_(path),
filename_(path.filename().string()),
deleted_keys_(),
dictionary_(),
in_merge_(false),
new_delete_(false) {
if (load) {
Load();
}
Expand All @@ -62,33 +67,56 @@ class Segment final {

const std::string& GetFilename() const { return filename_; }

void MarkMerge() { in_merge_ = true; }
void MarkMerge() {
in_merge_ = true;
Persist();
}

void UnMarkMerge() { in_merge_ = false; }
void UnMarkMerge() {
in_merge_ = false;
deleted_keys_.insert(deleted_keys_during_merge_.begin(), deleted_keys_during_merge_.end());
deleted_keys_during_merge_.clear();
}

bool MarkedForMerge() const { return in_merge_; }

void DeleteKey(const std::string& key) {
deleted_keys_.insert(key);
dirty_ = true;
if (in_merge_) {
deleted_keys_during_merge_.insert(key);
} else {
deleted_keys_.insert(key);
}
new_delete_ = true;
}

// persist deleted keys
void Persist() {
if (!new_delete_) {
return;
}

boost::filesystem::path deleted_keys_file = path_;
deleted_keys_file += ".dk";

std::ofstream out_stream(deleted_keys_file.string(), std::ios::binary);
msgpack::pack(out_stream, deleted_keys_);
// its ensured that before merge persis is called, so we have to persist only one or the other file
if (in_merge_) {
deleted_keys_file += ".dkm";
std::ofstream out_stream(deleted_keys_file.string(), std::ios::binary);
msgpack::pack(out_stream, deleted_keys_during_merge_);
} else {
deleted_keys_file += ".dk";
std::ofstream out_stream(deleted_keys_file.string(), std::ios::binary);
msgpack::pack(out_stream, deleted_keys_);
}
}

private:
boost::filesystem::path path_;
std::string filename_;
std::set<std::string> deleted_keys_;
std::set<std::string> deleted_keys_during_merge_;
dictionary::dictionary_t dictionary_;
bool in_merge_;
bool dirty_;
bool new_delete_;

void Load() { dictionary_.reset(new dictionary::Dictionary(path_.string())); }
};
Expand Down
Loading