Skip to content

Commit

Permalink
shared storage with mutex
Browse files Browse the repository at this point in the history
shared lock for reads and unique lock for writes

declare cache as object and not pointer in serach engine data to simulate singleton declaration

put a lock infront of the clear function to make it threadsafe

remove clear function from cache because cache will never get dropped

unit tests

unit tests and timestamp as part of key

cache generations

hash the key

500 mb

1000 mb

250 mb

rebase against implement-cache
  • Loading branch information
ghoshkaj committed Apr 9, 2018
1 parent e0ccff3 commit 951daed
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 68 deletions.
27 changes: 18 additions & 9 deletions include/engine/routing_algorithms/routing_base_ch.hpp
Expand Up @@ -323,11 +323,16 @@ EdgeDuration calculateEBGNodeAnnotations(const DataFacade<Algorithm> &facade,

std::get<2>(edge) = true; // mark that this edge will now be processed

if (unpacking_cache.IsEdgeInCache(std::make_tuple(
std::get<0>(edge), std::get<1>(edge), facade.GetExcludeIndex())))
if (unpacking_cache.IsEdgeInCache(std::make_tuple(std::get<0>(edge),
std::get<1>(edge),
facade.GetExcludeIndex(),
facade.GetTimestamp())))
{
EdgeDuration duration = unpacking_cache.GetDuration(std::make_tuple(
std::get<0>(edge), std::get<1>(edge), facade.GetExcludeIndex()));
EdgeDuration duration =
unpacking_cache.GetDuration(std::make_tuple(std::get<0>(edge),
std::get<1>(edge),
facade.GetExcludeIndex(),
facade.GetTimestamp()));
duration_stack.emplace(duration);
}
else
Expand Down Expand Up @@ -369,8 +374,10 @@ EdgeDuration calculateEBGNodeAnnotations(const DataFacade<Algorithm> &facade,
}
else
{
auto temp = std::make_tuple(
std::get<0>(edge), std::get<1>(edge), facade.GetExcludeIndex());
auto temp = std::make_tuple(std::get<0>(edge),
std::get<1>(edge),
facade.GetExcludeIndex(),
facade.GetTimestamp());
// compute the duration here and put it onto the duration stack using method
// similar to annotatePath but smaller
EdgeDuration duration =
Expand All @@ -391,9 +398,11 @@ EdgeDuration calculateEBGNodeAnnotations(const DataFacade<Algorithm> &facade,
duration_stack.pop();
EdgeDuration duration = edge1 + edge2;
duration_stack.emplace(duration);
unpacking_cache.AddEdge(
std::make_tuple(std::get<0>(edge), std::get<1>(edge), facade.GetExcludeIndex()),
duration);
unpacking_cache.AddEdge(std::make_tuple(std::get<0>(edge),
std::get<1>(edge),
facade.GetExcludeIndex(),
facade.GetTimestamp()),
duration);
}
}

Expand Down
9 changes: 4 additions & 5 deletions include/engine/search_engine_data.hpp
Expand Up @@ -6,7 +6,8 @@
#include "util/query_heap.hpp"
#include "util/typedefs.hpp"

#include <boost/thread/tss.hpp>
// #include <boost/thread/tss.hpp>
#include <boost/thread.hpp>

namespace osrm
{
Expand Down Expand Up @@ -47,7 +48,6 @@ template <> struct SearchEngineData<routing_algorithms::ch::Algorithm>

using SearchEngineHeapPtr = boost::thread_specific_ptr<QueryHeap>;
using ManyToManyHeapPtr = boost::thread_specific_ptr<ManyToManyQueryHeap>;
using UnpackingCachePtr = boost::thread_specific_ptr<UnpackingCache>;

static SearchEngineHeapPtr forward_heap_1;
static SearchEngineHeapPtr reverse_heap_1;
Expand All @@ -56,7 +56,8 @@ template <> struct SearchEngineData<routing_algorithms::ch::Algorithm>
static SearchEngineHeapPtr forward_heap_3;
static SearchEngineHeapPtr reverse_heap_3;
static ManyToManyHeapPtr many_to_many_heap;
static UnpackingCachePtr unpacking_cache;

static UnpackingCache unpacking_cache;

void InitializeOrClearFirstThreadLocalStorage(unsigned number_of_nodes);

Expand All @@ -65,8 +66,6 @@ template <> struct SearchEngineData<routing_algorithms::ch::Algorithm>
void InitializeOrClearThirdThreadLocalStorage(unsigned number_of_nodes);

void InitializeOrClearManyToManyThreadLocalStorage(unsigned number_of_nodes);

void InitializeOrClearUnpackingCacheThreadLocalStorage(unsigned timestamp);
};

struct MultiLayerDijkstraHeapData
Expand Down
34 changes: 14 additions & 20 deletions include/engine/unpacking_cache.hpp
Expand Up @@ -2,10 +2,7 @@
#define UNPACKING_CACHE_HPP

#include <boost/optional/optional_io.hpp>
<<<<<<< HEAD
#include <boost/thread.hpp>
=======
>>>>>>> 68659b398... set up for computing durations while unpacking them

#include "../../third_party/compute_detail/lru_cache.hpp"
#include "util/typedefs.hpp"
Expand All @@ -16,7 +13,7 @@ namespace engine
{
typedef unsigned char ExcludeIndex;
typedef unsigned Timestamp;
typedef std::tuple<NodeID, NodeID, ExcludeIndex> Key;
typedef std::tuple<NodeID, NodeID, ExcludeIndex, Timestamp> Key;
typedef std::size_t HashedKey;

struct HashKey
Expand All @@ -26,11 +23,13 @@ struct HashKey
std::size_t h1 = std::hash<NodeID>{}(std::get<0>(key));
std::size_t h2 = std::hash<NodeID>{}(std::get<1>(key));
std::size_t h3 = std::hash<ExcludeIndex>{}(std::get<2>(key));
std::size_t h4 = std::hash<Timestamp>{}(std::get<3>(key));

std::size_t seed = 0;
boost::hash_combine(seed, h1);
boost::hash_combine(seed, h2);
boost::hash_combine(seed, h3);
boost::hash_combine(seed, h4);

return seed;
}
Expand All @@ -40,7 +39,7 @@ class UnpackingCache
{
private:
boost::compute::detail::lru_cache<HashedKey, EdgeDuration> m_cache;
unsigned m_current_data_timestamp = 0;
boost::shared_mutex m_shared_access;

public:
// TO FIGURE OUT HOW MANY LINES TO INITIALIZE CACHE TO:
Expand All @@ -56,7 +55,7 @@ class UnpackingCache
// = n * std::size_t
// = n * 8 bytes
// Total = n * 20 bytes
// Total cache size: 500 mb = 500 * 1024 *1024 bytes = 524288000 bytes
// Total cache size: 500 mb = 1000 * 1024 *1024 bytes = 524288000 bytes

// THREAD LOCAL STORAGE
// Number of lines we need = 524288000 / 20 / number of threads = 26214400 / number of threads
Expand All @@ -66,42 +65,37 @@ class UnpackingCache
// 2 threads: 26214400 / 2 = 13107200

// SHARED STORAGE CACHE
// Number of lines we need for shared storage cache = 524288000 / 20 = 26214400
// Number of lines for shared storage cache 1000 mb = 1048576000 / 20 = 52428800
// Number of lines for shared storage cache 500 mb = 524288000 / 20 = 26214400
// Number of lines for shared storage cache 250 mb = 262144000 / 20 = 13107200

UnpackingCache(unsigned timestamp) : m_cache(13107200), m_current_data_timestamp(timestamp){};
UnpackingCache() : m_cache(26214400){};

UnpackingCache(std::size_t cache_size, unsigned timestamp)
: m_cache(cache_size), m_current_data_timestamp(timestamp){};

void Clear(unsigned new_data_timestamp)
{
if (m_current_data_timestamp != new_data_timestamp)
{
m_cache.clear();
m_current_data_timestamp = new_data_timestamp;
}
}
UnpackingCache(std::size_t cache_size) : m_cache(cache_size){};

bool IsEdgeInCache(Key edge)
{
HashedKey hashed_edge = HashKey{}(edge);
boost::shared_lock<boost::shared_mutex> lock(m_shared_access);
return m_cache.contains(hashed_edge);
}

void AddEdge(Key edge, EdgeDuration duration)
{
HashedKey hashed_edge = HashKey{}(edge);
boost::unique_lock<boost::shared_mutex> lock(m_shared_access);
m_cache.insert(hashed_edge, duration);
}

EdgeDuration GetDuration(Key edge)
{
HashedKey hashed_edge = HashKey{}(edge);
boost::shared_lock<boost::shared_mutex> lock(m_shared_access);
boost::optional<EdgeDuration> duration = m_cache.get(hashed_edge);
return duration ? *duration : MAXIMAL_EDGE_DURATION;
}
};
} // engine
} // osrm

#endif // UNPACKING_CACHE_HPP
#endif // UNPACKING_CACHE_HPP
5 changes: 1 addition & 4 deletions src/engine/routing_algorithms/many_to_many_ch.cpp
Expand Up @@ -225,9 +225,6 @@ std::vector<EdgeDuration> manyToManySearch(SearchEngineData<ch::Algorithm> &engi
std::vector<NodeBucket> search_space_with_buckets;
std::vector<NodeID> packed_leg;

engine_working_data.InitializeOrClearUnpackingCacheThreadLocalStorage(
facade.GetTimestamp()); // always pass in the timestamp and clear if it's different

// Populate buckets with paths from all accessible nodes to destinations via backward searches
for (std::uint32_t column_idx = 0; column_idx < target_indices.size(); ++column_idx)
{
Expand Down Expand Up @@ -321,7 +318,7 @@ std::vector<EdgeDuration> manyToManySearch(SearchEngineData<ch::Algorithm> &engi
ch::calculateEBGNodeAnnotations(facade,
packed_leg.begin(),
packed_leg.end(),
*engine_working_data.unpacking_cache.get());
engine_working_data.unpacking_cache);

// check the direction of travel to figure out how to calculate the offset to/from
// the source/target
Expand Down
14 changes: 1 addition & 13 deletions src/engine/search_engine_data.cpp
Expand Up @@ -14,7 +14,7 @@ SearchEngineData<CH>::SearchEngineHeapPtr SearchEngineData<CH>::reverse_heap_2;
SearchEngineData<CH>::SearchEngineHeapPtr SearchEngineData<CH>::forward_heap_3;
SearchEngineData<CH>::SearchEngineHeapPtr SearchEngineData<CH>::reverse_heap_3;
SearchEngineData<CH>::ManyToManyHeapPtr SearchEngineData<CH>::many_to_many_heap;
SearchEngineData<CH>::UnpackingCachePtr SearchEngineData<CH>::unpacking_cache;
UnpackingCache SearchEngineData<CH>::unpacking_cache;

void SearchEngineData<CH>::InitializeOrClearFirstThreadLocalStorage(unsigned number_of_nodes)
{
Expand Down Expand Up @@ -91,18 +91,6 @@ void SearchEngineData<CH>::InitializeOrClearManyToManyThreadLocalStorage(unsigne
}
}

void SearchEngineData<CH>::InitializeOrClearUnpackingCacheThreadLocalStorage(unsigned timestamp)
{
if (unpacking_cache.get())
{
unpacking_cache->Clear(timestamp);
}
else
{
unpacking_cache.reset(new UnpackingCache(timestamp));
}
}

// MLD
using MLD = routing_algorithms::mld::Algorithm;
SearchEngineData<MLD>::SearchEngineHeapPtr SearchEngineData<MLD>::forward_heap_1;
Expand Down
36 changes: 19 additions & 17 deletions unit_tests/engine/unpacking_cache.cpp
Expand Up @@ -16,17 +16,17 @@ BOOST_AUTO_TEST_CASE(add_edge_and_check_existence)
{
// Arrange (Setup)
unsigned timestamp = 1522782542;
UnpackingCache cache(1, timestamp);
UnpackingCache cache(1);

auto key = std::make_tuple(1, 1, 1);
auto key = std::make_tuple(1, 1, 1, timestamp);
auto value = 1;

// Act
cache.AddEdge(key, value);

// Assert
BOOST_CHECK(cache.IsEdgeInCache(key) == true);
BOOST_CHECK(cache.IsEdgeInCache(std::make_tuple(2, 2, 2)) == false);
BOOST_CHECK(cache.IsEdgeInCache(std::make_tuple(2, 2, 2, timestamp)) == false);

auto result = cache.GetDuration(key);
BOOST_CHECK_EQUAL(result, value);
Expand All @@ -35,13 +35,12 @@ BOOST_AUTO_TEST_CASE(add_edge_and_check_existence)
BOOST_AUTO_TEST_CASE(cache_invalidation)
{
// Arrange (Setup)
unsigned timestamp = 1522782542;
UnpackingCache cache(1, timestamp);
UnpackingCache cache(1);

auto key1 = std::make_tuple(1, 1, 1);
auto key1 = std::make_tuple(1, 1, 1, 1522782542);
auto value1 = 1;

auto key2 = std::make_tuple(2, 2, 2);
auto key2 = std::make_tuple(2, 2, 2, 1522782543);
auto value2 = 2;

// Act
Expand All @@ -56,31 +55,34 @@ BOOST_AUTO_TEST_CASE(cache_invalidation)
BOOST_CHECK_EQUAL(result, value2);
}

BOOST_AUTO_TEST_CASE(new_data)
BOOST_AUTO_TEST_CASE(store_generations)
{
// Arrange (Setup)
UnpackingCache cache(2);

unsigned timestamp1 = 1522782542;
unsigned timestamp2 = 1522782543;

UnpackingCache cache(1, timestamp1);

auto key1 = std::make_tuple(1, 2, 3);
auto key1 = std::make_tuple(1, 1, 1, timestamp1);
auto value1 = 1;
auto key2 = std::make_tuple(2, 3, 4);

auto key2 = std::make_tuple(1, 1, 1, timestamp2);
auto value2 = 2;

// Act
cache.AddEdge(key1, value1);
cache.Clear(timestamp2);
cache.AddEdge(key2, value2);

// Assert
BOOST_CHECK(cache.IsEdgeInCache(key1) == false);
BOOST_CHECK(cache.IsEdgeInCache(key1) == true);
BOOST_CHECK(cache.IsEdgeInCache(key2) == true);
BOOST_CHECK(cache.IsEdgeInCache(std::make_tuple(2, 2, 2)) == false);
BOOST_CHECK(cache.IsEdgeInCache(std::make_tuple(2, 2, 2, timestamp1)) == false);

auto result = cache.GetDuration(key2);
BOOST_CHECK_EQUAL(result, value2);
auto result1 = cache.GetDuration(key1);
BOOST_CHECK_EQUAL(result1, value1);

auto result2 = cache.GetDuration(key2);
BOOST_CHECK_EQUAL(result2, value2);
}

BOOST_AUTO_TEST_SUITE_END()

0 comments on commit 951daed

Please sign in to comment.