Skip to content
Browse files

added simple prefetch strategy

  • Loading branch information...
1 parent a92465f commit d48b109e3073f597e080f596652d41614ea6453b @bnicolae committed
View
14 bindings/blob-fuse/local_mirror.hpp
@@ -39,6 +39,7 @@ class local_mirror_t {
boost::uint64_t blob_size, page_size;
std::vector<boost::uint64_t> chunk_map;
std::vector<bool> written_map;
+ blob::prefetch_list_t prefetch_list;
};
template <class Object>
@@ -126,7 +127,7 @@ boost::uint64_t local_mirror_t<Object>::read(size_t size, off_t off, char * &buf
read_size += page_size;
DBG("READ OP - remote read request issued (off, size) = (" << read_off <<
", " << read_size << ")");
- if (!blob->read(read_off, read_size, mapping + read_off, version))
+ if (!blob->read(read_off, read_size, mapping + read_off, version, 1, prefetch_list))
return 0;
while (index * page_size < read_off + read_size) {
chunk_map[index] = page_size;
@@ -135,6 +136,17 @@ boost::uint64_t local_mirror_t<Object>::read(size_t size, off_t off, char * &buf
} else
index++;
}
+ for (blob::prefetch_list_t::iterator i = prefetch_list.begin(); i != prefetch_list.end(); i++) {
+ boost::uint64_t index = i->first / page_size;
+ if (chunk_map[index] < page_size) {
+ DBG("READ OP (HINT) - remote read request issued (off, size) = ("
+ << i->first + chunk_map[index] << ", " << page_size - chunk_map[index] << ")");
+ if (!blob->read(i->first + chunk_map[index], page_size - chunk_map[index],
+ mapping + i->first + chunk_map[index], version))
+ return 0;
+ chunk_map[index] = page_size;
+ }
+ }
buf = mapping + off;
return size;
View
11 client/object_handler.cpp
@@ -180,9 +180,10 @@ bool object_handler::get_locations(page_locations_t &loc, boost::uint64_t offset
std::vector<random_select> vadv(nbr_vadv);
metadata::query_t range(query_root.node.id, query_root.node.version, new_offset, new_size);
+ blob::prefetch_list_t unused;
TIMER_START(meta_timer);
- bool result = query->readRecordLocations(vadv, range, query_root);
+ bool result = query->readRecordLocations(vadv, unused, range, query_root, 0xFFFFFFFF);
TIMER_STOP(meta_timer, "GET_LOCATIONS " << range << ": Metadata read operation, success: " << result);
if (!result)
return false;
@@ -201,7 +202,9 @@ bool object_handler::get_locations(page_locations_t &loc, boost::uint64_t offset
return true;
}
-bool object_handler::read(boost::uint64_t offset, boost::uint64_t size, char *buffer, boost::uint32_t version) {
+bool object_handler::read(boost::uint64_t offset, boost::uint64_t size, char *buffer,
+ boost::uint32_t version, boost::uint32_t threshold,
+ const blob::prefetch_list_t &prefetch_list) {
metadata::root_t query_root(0, 0, 0, 0, 0);
if (version == 0)
@@ -229,7 +232,9 @@ bool object_handler::read(boost::uint64_t offset, boost::uint64_t size, char *bu
metadata::query_t range(query_root.node.id, query_root.node.version, new_offset, new_size);
TIMER_START(meta_timer);
- bool result = query->readRecordLocations(vadv, range, query_root);
+ // !! BAD practice to const_cast. Interface needs to be redesigned.
+ bool result = query->readRecordLocations(vadv, const_cast<blob::prefetch_list_t &>(prefetch_list),
+ range, query_root, threshold);
TIMER_STOP(meta_timer, "READ " << range << ": Metadata read operation, success: " << result);
if (!result)
return false;
View
10 client/object_handler.hpp
@@ -1,3 +1,5 @@
+#include <set>
+
#include <boost/random.hpp>
#include <boost/dynamic_bitset.hpp>
@@ -30,8 +32,12 @@ class object_handler {
bool clone(boost::int32_t id = 0, boost::int32_t version = 0);
bool get_latest(boost::uint32_t id = 0);
- bool read(boost::uint64_t offset, boost::uint64_t size, char *buffer, boost::uint32_t version = 0);
- bool get_locations(page_locations_t &loc, boost::uint64_t offset, boost::uint64_t size, boost::uint32_t version = 0);
+ bool read(boost::uint64_t offset, boost::uint64_t size, char *buffer,
+ boost::uint32_t version = 0, boost::uint32_t threshold = 0xFFFFFFFF,
+ const blob::prefetch_list_t &prefetch_list = blob::prefetch_list_t());
+
+ bool get_locations(page_locations_t &loc, boost::uint64_t offset, boost::uint64_t size,
+ boost::uint32_t version = 0);
bool append(boost::uint64_t size, char *buffer);
bool write(boost::uint64_t offset, boost::uint64_t size, char *buffer);
View
42 client/range_query.cpp
@@ -229,6 +229,7 @@ static void leaf_callback(bool &result,
static void read_callback(dht_t *dht, metadata::query_t &range, bool &result,
std::vector<interval_range_query::replica_policy_t> &leaves,
boost::uint64_t page_size, boost::uint64_t offset,
+ boost::uint32_t threshold, blob::prefetch_list_t &prefetch_list,
buffer_wrapper val) {
metadata::dhtnode_t node(false);
@@ -236,28 +237,39 @@ static void read_callback(dht_t *dht, metadata::query_t &range, bool &result,
return;
DBG("READ NODE " << node);
if (node.is_leaf) {
- dht->get(buffer_wrapper(node.left, true),
- boost::bind(leaf_callback, boost::ref(result),
- boost::ref(leaves), node.left,
- (offset - range.offset) / page_size, _1));
- return;
+ // Leaf is part of the request
+ if (node.left.intersects(range))
+ dht->get(buffer_wrapper(node.left, true),
+ boost::bind(leaf_callback, boost::ref(result),
+ boost::ref(leaves), node.left,
+ (offset - range.offset) / page_size, _1));
+ // Leaf is a prefetch candidate
+ else if (node.access_count > threshold && prefetch_list[offset] < node.access_count)
+ prefetch_list[offset] = node.access_count;
+ } else {
+ if (node.left.intersects(range) || node.access_count > threshold)
+ dht->get(buffer_wrapper(node.left, true),
+ boost::bind(read_callback, dht, boost::ref(range), boost::ref(result),
+ boost::ref(leaves), page_size, node.left.offset,
+ threshold, boost::ref(prefetch_list), _1));
+ if (node.right.intersects(range) || node.access_count > threshold)
+ dht->get(buffer_wrapper(node.right, true),
+ boost::bind(read_callback, dht, boost::ref(range), boost::ref(result),
+ boost::ref(leaves), page_size, node.right.offset,
+ threshold, boost::ref(prefetch_list), _1));
}
- if (node.left.intersects(range))
- dht->get(buffer_wrapper(node.left, true),
- boost::bind(read_callback, dht, boost::ref(range), boost::ref(result),
- boost::ref(leaves), page_size, node.left.offset, _1));
- if (node.right.intersects(range))
- dht->get(buffer_wrapper(node.right, true),
- boost::bind(read_callback, dht, boost::ref(range), boost::ref(result),
- boost::ref(leaves), page_size, node.right.offset, _1));
}
-bool interval_range_query::readRecordLocations(std::vector<interval_range_query::replica_policy_t> &leaves, metadata::query_t &range, metadata::root_t &root) {
+bool interval_range_query::readRecordLocations(std::vector<interval_range_query::replica_policy_t> &leaves,
+ blob::prefetch_list_t &prefetch_list,
+ metadata::query_t &range, metadata::root_t &root,
+ boost::uint32_t threshold) {
bool result = true;
dht->get(buffer_wrapper(root.node, true),
boost::bind(read_callback, dht, boost::ref(range), boost::ref(result),
- boost::ref(leaves), root.page_size, 0, _1));
+ boost::ref(leaves), root.page_size, 0,
+ threshold, boost::ref(prefetch_list), _1));
dht->wait();
return result;
}
View
13 client/range_query.hpp
@@ -3,6 +3,7 @@
#include <deque>
#include <vector>
+#include <map>
#include "common/structures.hpp"
#include "common/simple_dht.hpp"
@@ -10,6 +11,13 @@
#include "replica_policy.hpp"
+namespace blob {
+ // entries in the prefetch queue are of the form (access count, offset)
+ typedef std::pair<boost::uint64_t, boost::uint32_t> prefetch_entry_t;
+ // entries are stored in a std::set
+ typedef std::map<boost::uint64_t, boost::uint32_t> prefetch_list_t;
+}
+
class interval_range_query {
public:
// typedef cached_dht<async_dht<bamboo_dht>, buffer_wrapper_hash> dht_t;
@@ -23,7 +31,10 @@ class interval_range_query {
~interval_range_query();
bool readRecordLocations(std::vector<replica_policy_t> &leaves,
- metadata::query_t &range, metadata::root_t &root);
+ blob::prefetch_list_t &prefetch_list,
+ metadata::query_t &range, metadata::root_t &root,
+ boost::uint32_t threshold);
+
bool writeRecordLocations(vmgr_reply &mgr_reply, node_deque_t &node_deque,
metadata::replica_list_t &provider_list);
private:
View
6 common/buffer_wrapper.hpp
@@ -57,6 +57,12 @@ class buffer_wrapper {
template <class T> buffer_wrapper(T const &content, bool serialize);
+ void swap(buffer_wrapper v) {
+ unsigned int x = len; len = v.len; v.len = x;
+ char *y = content_ptr; content_ptr = v.content_ptr; v.content_ptr = y;
+ content.swap(v.content);
+ }
+
buffer_wrapper(char *ct, unsigned int size, bool is_managed = false) :
len(size), content_ptr(ct), hash(0) {
if (is_managed)
View
7 common/structures.hpp
@@ -112,6 +112,7 @@ class dhtnode_t {
public:
bool is_leaf;
query_t left, right;
+ boost::uint32_t access_count;
dhtnode_t(bool leaf) : is_leaf(leaf) { }
@@ -124,7 +125,7 @@ class dhtnode_t {
}
template <class Archive> void serialize(Archive &ar, unsigned int) {
- ar & left & right & is_leaf;
+ ar & left & right & is_leaf & access_count;
}
};
@@ -161,7 +162,8 @@ class vmgr_reply {
vmgr_reply() : stable_root(0, 0, 0, 0, 0) { }
- static metadata::query_t search_list(metadata::siblings_enum_t &siblings, boost::uint64_t offset, boost::uint64_t size) {
+ static metadata::query_t search_list(metadata::siblings_enum_t &siblings, boost::uint64_t offset,
+ boost::uint64_t size) {
for (unsigned int i = 0; i < siblings.size(); i++)
if (siblings[i].offset == offset && siblings[i].size == size)
return siblings[i];
@@ -173,5 +175,4 @@ class vmgr_reply {
}
};
-
#endif
View
2 pmanager/adv_manager.hpp
@@ -108,7 +108,7 @@ class adv_manager {
typedef boost::multi_index::index<adv_table_t, tinfo>::type adv_table_by_info;
typedef boost::multi_index::index<adv_table_t, ttime>::type adv_table_by_time;
- static const unsigned int WATCHDOG_TIMEOUT = 30;
+ static const unsigned int WATCHDOG_TIMEOUT = 120;
public:
rpcreturn_t update(const rpcvector_t &params, rpcvector_t &result, const std::string &id);
View
2 provider/CMakeLists.txt
@@ -1,5 +1,5 @@
add_executable (provider provider.cpp pmgr_listener.cpp bdb_bw_map.cpp null_bw_map.cpp)
-add_executable (sdht sdht.cpp bdb_bw_map.cpp null_bw_map.cpp)
+add_executable (sdht sdht.cpp bdb_bw_map.cpp meta_listener.cpp null_bw_map.cpp)
# Link the executable to the necessary libraries.
target_link_libraries (provider ${CONFIG_LIBRARIES} ${BOOST_LIBRARIES} ${BDB_LIBRARIES})
View
24 provider/page_manager.hpp
@@ -10,12 +10,12 @@
#include "common/cache_mt.hpp"
#include "common/config.hpp"
-typedef boost::tuple<boost::uint64_t, buffer_wrapper, boost::uint64_t, std::string> monitored_params_t;
+typedef boost::tuple<boost::uint64_t, buffer_wrapper, buffer_wrapper, std::string> monitored_params_t;
template <class Persistency> class page_manager {
private:
typedef Persistency page_cache_t;
- typedef boost::function<void (const boost::int32_t, const monitored_params_t &) > update_hook_t;
+ typedef boost::function<void (const boost::int32_t, monitored_params_t &) > update_hook_t;
typedef std::vector<update_hook_t> update_hooks_t;
public:
@@ -34,7 +34,8 @@ template <class Persistency> class page_manager {
update_hooks_t update_hooks;
bool compression;
- void exec_hooks(const boost::int32_t rpc_name, buffer_wrapper page_id, boost::uint64_t page_size, const std::string &sender);
+ void exec_hooks(const boost::int32_t rpc_name, buffer_wrapper page_id, buffer_wrapper val,
+ const std::string &sender);
};
template <class Persistency> page_manager<Persistency>::page_manager(page_cache_t *pc, bool c)
@@ -53,7 +54,7 @@ template <class Persistency> rpcreturn_t page_manager<Persistency>::write_page(c
ERROR("could not write page");
return rpcstatus::eres;
} else {
- exec_hooks(PROVIDER_WRITE, params[i], params[i + 1].size(), sender);
+ exec_hooks(PROVIDER_WRITE, params[i], params[i + 1], sender);
DBG("page written: " << params[i]);
}
return rpcstatus::ok;
@@ -73,7 +74,7 @@ template <class Persistency> rpcreturn_t page_manager<Persistency>::read_page(co
INFO("page could not be read: " << params[i]);
result.push_back(buffer_wrapper());
} else {
- exec_hooks(PROVIDER_READ, params[i], data.size(), sender);
+ exec_hooks(PROVIDER_READ, params[i], data, sender);
result.push_back(data);
ok++;
}
@@ -108,15 +109,18 @@ template <class Persistency> rpcreturn_t page_manager<Persistency>::read_partial
<< params[0]);
return rpcstatus::eobj;
}
- result.push_back(buffer_wrapper(data.get() + offset, size, true));
- exec_hooks(PROVIDER_READ, params[0], size, sender);
+ buffer_wrapper result_buffer(data.get() + offset, size, true);
+ result.push_back(result_buffer);
+ exec_hooks(PROVIDER_READ, params[0], result_buffer, sender);
return rpcstatus::ok;
}
-template <class Persistency> void page_manager<Persistency>::exec_hooks(const boost::int32_t rpc_name, buffer_wrapper page_id, const boost::uint64_t ps, const std::string &sender) {
- for (update_hooks_t::iterator i = update_hooks.begin(); i != update_hooks.end(); ++i)
- (*i)(rpc_name, monitored_params_t(page_cache->get_free(), page_id, ps, sender));
+template <class Persistency> void page_manager<Persistency>::exec_hooks(const boost::int32_t rpc_name, buffer_wrapper page_id, buffer_wrapper val, const std::string &sender) {
+ for (update_hooks_t::iterator i = update_hooks.begin(); i != update_hooks.end(); ++i) {
+ monitored_params_t mp(page_cache->get_free(), page_id, val, sender);
+ (*i)(rpc_name, mp);
+ }
}
template <class Persistency> void page_manager<Persistency>::add_listener(update_hook_t hook) {
View
10 provider/pmgr_listener.cpp
@@ -18,19 +18,21 @@ pmgr_listener::pmgr_listener(boost::asio::io_service &io_service,
pmgr_listener::~pmgr_listener() {
}
-void pmgr_listener::update_event(const boost::int32_t name, const monitored_params_t &params) {
+void pmgr_listener::update_event(const boost::int32_t name, monitored_params_t &params) {
switch (name) {
case PROVIDER_WRITE:
free_space = params.get<0>();
if (free_space == 0)
timeout_callback(boost::system::error_code());
- INFO("write_page initiated by " << params.get<3>() << ", page size is: {" << params.get<2>() << "} (WPS)");
+ INFO("write_page initiated by " << params.get<3>() << ", page size is: {"
+ << params.get<2>().size() << "} (WPS)");
INFO("free space has changed, now is: {" << params.get<0>() << "} (FSC)");
break;
case PROVIDER_READ:
nr_read_pages++;
- total_read_size += params.get<2>();
- INFO("read_page initiated by " << params.get<3>() << ", page size is: {" << params.get<2>() << "} (RPS)");
+ total_read_size += params.get<2>().size();
+ INFO("read_page initiated by " << params.get<3>()
+ << ", page size is: {" << params.get<2>().size() << "} (RPS)");
break;
default:
ERROR("Unknown hook type: " << name);
View
4 provider/pmgr_listener.hpp
@@ -13,7 +13,7 @@
class pmgr_listener {
typedef rpc_client<config::socket_namespace> rpc_client_t;
- static const unsigned int UPDATE_TIMEOUT = 5;
+ static const unsigned int UPDATE_TIMEOUT = 30;
std::string phost, pservice, service;
boost::uint64_t free_space, nr_read_pages, total_read_size;
@@ -28,7 +28,7 @@ class pmgr_listener {
const std::string &ph, const std::string &ps,
const boost::uint64_t fs,
const std::string &service);
- void update_event(const boost::int32_t name, const monitored_params_t &params);
+ void update_event(const boost::int32_t name, monitored_params_t &params);
~pmgr_listener();
};
View
13 provider/provider.cpp
@@ -18,19 +18,22 @@ template <class Storage> void run_server(Storage &provider_storage) {
rpc_server<config::socket_namespace> provider_server(io_service);
pmgr_listener plistener(io_service, phost, pservice, ((boost::uint64_t)1 << 20) * total_space, service);
-
provider_storage.add_listener(boost::bind(&pmgr_listener::update_event, boost::ref(plistener), _1, _2));
provider_server.register_rpc(PROVIDER_WRITE,
(rpcserver_extcallback_t)boost::bind(&Storage::write_page,
- boost::ref(provider_storage), _1, _2, _3));
+ boost::ref(provider_storage),
+ _1, _2, _3));
provider_server.register_rpc(PROVIDER_READ,
(rpcserver_extcallback_t)boost::bind(&Storage::read_page,
- boost::ref(provider_storage), _1, _2, _3));
+ boost::ref(provider_storage),
+ _1, _2, _3));
provider_server.register_rpc(PROVIDER_READ_PARTIAL,
(rpcserver_extcallback_t)boost::bind(&Storage::read_partial_page,
- boost::ref(provider_storage), _1, _2, _3));
+ boost::ref(provider_storage),
+ _1, _2, _3));
- provider_server.start_listening(config::socket_namespace::endpoint(config::socket_namespace::v4(), atoi(service.c_str())));
+ provider_server.start_listening(config::socket_namespace::endpoint(config::socket_namespace::v4(),
+ atoi(service.c_str())));
INFO("listening on " << provider_server.pretty_format_str() << ", offering max. " << total_space << " MB");
io_service.run();
}
View
19 provider/sdht.cpp
@@ -2,6 +2,7 @@
#include "libconfig.h++"
#include "page_manager.hpp"
+#include "meta_listener.hpp"
#include "rpc/rpc_server.hpp"
#include "bdb_bw_map.hpp"
#include "null_bw_map.hpp"
@@ -16,20 +17,24 @@ template <class Storage> void run_server(Storage &provider_storage) {
boost::asio::io_service io_service;
rpc_server<config::socket_namespace> provider_server(io_service);
+ meta_listener mlistener;
+ provider_storage.add_listener(boost::bind(&meta_listener::update_event, boost::ref(mlistener), _1, _2));
provider_server.register_rpc(PROVIDER_WRITE,
(rpcserver_extcallback_t)boost::bind(&Storage::write_page,
- boost::ref(provider_storage), _1, _2, _3));
+ boost::ref(provider_storage),
+ _1, _2, _3));
provider_server.register_rpc(PROVIDER_READ,
(rpcserver_extcallback_t)boost::bind(&Storage::read_page,
- boost::ref(provider_storage), _1, _2, _3));
- provider_server.start_listening(config::socket_namespace::endpoint(config::socket_namespace::v4(), atoi(service.c_str())));
- INFO("listening on " << provider_server.pretty_format_str() << ", offering max. " << total_space << " MB");
+ boost::ref(provider_storage),
+ _1, _2, _3));
+ provider_server.start_listening(config::socket_namespace::endpoint(config::socket_namespace::v4(),
+ atoi(service.c_str())));
+ INFO("listening on " << provider_server.pretty_format_str()
+ << ", offering max. " << total_space << " MB");
io_service.run();
}
-int main(int argc, char *argv[]) {
-
-
+int main(int argc, char *argv[]) {
if (argc != 2 && argc != 3) {
cout << "Usage: sdht <config_file> [<port>]" << endl;
return 1;

0 comments on commit d48b109

Please sign in to comment.
Something went wrong with that request. Please try again.