Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sql partition aware routing[API-1862] #1167

Merged
merged 39 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9187896
sql aware draft
akeles85 Mar 1, 2023
6337517
sql_result update
akeles85 Mar 1, 2023
d24577d
LRU Cache Interface
akeles85 Mar 2, 2023
9918fc8
std::priority_queue used
akeles85 Mar 3, 2023
8a37050
sql service implementation
akeles85 Mar 9, 2023
65fcca8
revert unused file
akeles85 Mar 9, 2023
0af4c06
clang format
akeles85 Mar 9, 2023
ca96b11
LRU Test added
akeles85 Mar 9, 2023
f6d5967
LRU Test added
akeles85 Mar 9, 2023
c4e2c7e
basic type tests
akeles85 Mar 13, 2023
65c9ede
unit tests for complex types
akeles85 Mar 14, 2023
d7c8736
test case update
akeles85 Mar 14, 2023
d171c8d
test added for routing
akeles85 Mar 16, 2023
981f313
clang format
akeles85 Mar 16, 2023
cc41570
3 members for sql test
akeles85 Mar 16, 2023
7b6df81
Merge branch 'sql_partition_aware' into concurrent_map
akeles85 Mar 16, 2023
5a5734c
Merge pull request #6 from akeles85/concurrent_map
akeles85 Mar 16, 2023
8062180
cluster version check added
akeles85 Mar 16, 2023
1d20dbd
test case failure fix
akeles85 Mar 16, 2023
72469a9
member3_ reset
akeles85 Mar 16, 2023
6a1f05d
additional test cases
akeles85 Mar 17, 2023
89fc51f
statement partition arg index set
akeles85 Mar 17, 2023
7279125
tdd review fix
akeles85 Mar 17, 2023
37a24e6
clang format
akeles85 Mar 17, 2023
bcea0c0
comments added
akeles85 Mar 18, 2023
c27da7a
clang format
akeles85 Mar 18, 2023
236bdaf
Merge branch 'hazelcast:master' into sql_partition_aware
akeles85 Mar 20, 2023
db360cc
jenkins fix
akeles85 Mar 20, 2023
0db8e4c
Merge branch 'sql_partition_aware' of github.com:akeles85/hazelcast-c…
akeles85 Mar 20, 2023
b3d6a57
compiler error fix for jenkins
akeles85 Mar 20, 2023
7aad67f
jenkins linux compiler error
akeles85 Mar 20, 2023
ccfe2d5
read_optimized_lru_cache removed from public API
akeles85 Mar 20, 2023
150f63c
code review fixes
akeles85 Mar 20, 2023
8d3080c
initial value of lock
akeles85 Mar 20, 2023
4143e37
rename variable
akeles85 Mar 22, 2023
f63e7c1
clang format
akeles85 Mar 22, 2023
142c5a3
Merge branch 'master' into sql_partition_aware
akeles85 Mar 30, 2023
cdb70d0
compiler error fixed
akeles85 Mar 30, 2023
0ae6fb0
windows bat file is updated
akeles85 Mar 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions hazelcast/include/hazelcast/client/client_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class HAZELCAST_API client_properties

const client_property& cloud_base_url() const;

const client_property& partition_arg_cache_size() const;

/**
* Client will be sending heartbeat messages to members and this is the
* timeout. If there is no any message passing between client and member
Expand Down Expand Up @@ -264,6 +266,18 @@ class HAZELCAST_API client_properties
static constexpr const char* CLOUD_URL_BASE_DEFAULT =
"api.viridian.hazelcast.com";

/**
* Parametrized SQL queries touching only a single partition benefit from
* using the partition owner as the query coordinator, if the partition
* owner can be determined from one of the query parameters. When such a
* query is executed, the cluster sends the index of such argument to the
* client. This parameter configures the size of the cache the client uses
* for storing this information.
*/
static constexpr const char* PARTITION_ARGUMENT_CACHE_SIZE =
"hazelcast.client.sql.partition.argument.cache.size";
static constexpr const char* PARTITION_ARGUMENT_CACHE_SIZE_DEFAULT = "1024";

/**
* Returns the configured boolean value of a {@link ClientProperty}.
*
Expand Down Expand Up @@ -314,6 +328,7 @@ class HAZELCAST_API client_properties
client_property backup_timeout_millis_;
client_property fail_on_indeterminate_state_;
client_property cloud_base_url_;
client_property partition_arg_cache_size_;

std::unordered_map<std::string, std::string> properties_map_;
};
Expand Down
203 changes: 203 additions & 0 deletions hazelcast/include/hazelcast/client/sql/impl/read_optimized_lru_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <queue>
#include "hazelcast/util/SynchronizedMap.h"
#include "hazelcast/util/export.h"

namespace hazelcast {
namespace client {
namespace sql {
namespace impl {

/**
* Implementation of an LRU cache optimized for read-heavy use cases.
* <p>
* It stores the entries in a {SynchronizedMap}, along with the last
* access time. It allows the size to grow beyond the capacity, up to
* `cleanup_threshold_`, at which point the inserting thread will remove a batch
* of the eldest items in two passes.
* <p>
* The cleanup process isn't synchronized to guarantee that the capacity is not
* exceeded. The cache is available during the cleanup for reads and writes. If
* there's a large number of writes by many threads, the one thread doing the
* cleanup might not be quick enough and there's no upper bound on the actual
* size of the cache. This is done to optimize the happy path when the keys fit
* into the cache.
*/
template<typename K, typename V>
class read_optimized_lru_cache
{
public:
/**
* @param capacity Capacity of the cache
* @param cleanup_threshold The size at which the cache will clean up oldest
* entries in batch. `cleanup_threshold - capacity` entries will be
* removed
* @throws exception::illegal_argument if capacity is smaller or equal to 0,
* or if the cleanup_threshold is smaller than capacity
*/
explicit read_optimized_lru_cache(const uint32_t capacity,
const uint32_t cleanup_threshold)
{
if (capacity == 0) {
BOOST_THROW_EXCEPTION(
client::exception::illegal_argument("capacity == 0"));
}
if (cleanup_threshold <= capacity) {
BOOST_THROW_EXCEPTION(client::exception::illegal_argument(
"cleanupThreshold <= capacity"));
}

capacity_ = capacity;
cleanup_threshold_ = cleanup_threshold;
}

/**
* @param key the key of the cache entry
* @param default_value the default value if the key is not cached.
* @returns Returns the value to which the specified key is cached,
* or default value if this cache contains no mapping for the key.
*/
std::shared_ptr<V> get_or_default(const K& key,
const std::shared_ptr<V>& default_value)
{
const auto existing_value = get(key);
return (existing_value != nullptr) ? existing_value : default_value;
}

/**
* @param key the key of the cache entry
* Returns the value to which the specified key is cached,
* or {@code null} if this cache contains no mapping for the key.
* @returns Returns the value to which the specified key is cached
*/
std::shared_ptr<V> get(const K& key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not return optional which makes small value optimizations for you?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it to be consistent with SynchronizedMap, so I prefer to leave it as it is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may have another version of synchronized map to work with optional, what do you think? They are very similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data is stored in synchronized map, and we just increment the usage count of the shared_ptr with get (which is done via atomic instruction). If we use optional, wouldn't it be expensive, because this time we will copy the data for storing it in optional?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this specific case, the value is an (int32 + int64_t) struct (std::shared_ptr<impl::read_optimized_lru_cache<std::string, int32_t>> partition_argument_index_cache_;) hence, copy would not be a big problem for this specific case. We have no other use of read_optimized_lru_cache structure any other places and have no such concern at the moment. Hence, I thought that it would have been better than using shared_ptr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, I have changed the implementation. (#1188)

{
auto value_from_cache = cache_.get(key);
if (value_from_cache == nullptr) {
return nullptr;
}
value_from_cache->touch();
return std::make_shared<int32_t>(value_from_cache->value_);
}

/**
* @param key the key of the cache entry
* @param value the value of the cache entry
* @throws exception::illegal_argument if the value equals to nullptr
*/
void put(const K& key, const std::shared_ptr<V>& value)
{
if (value == nullptr) {
BOOST_THROW_EXCEPTION(client::exception::illegal_argument(
"Null values are disallowed"));
}

auto old_value =
cache_.put(key, std::make_shared<value_and_timestamp<V>>(*value));
if (old_value == nullptr && cache_.size() > cleanup_threshold_) {
do_cleanup();
}
}

/**
* @param key the key of the cache entry
* Removes the cached value for the given key
*/
void remove(const K& key) { cache_.remove(key); }

protected:
/**
* Helper class to hold the value with timestamp.
*/
template<typename T>
class value_and_timestamp
{
public:
const T value_;
int64_t timestamp_;
ihsandemir marked this conversation as resolved.
Show resolved Hide resolved

value_and_timestamp(T value)
: value_(value)
{
touch();
}

void touch() { timestamp_ = util::current_time_nanos(); }
};

util::SynchronizedMap<K, value_and_timestamp<V>> cache_;

private:
/**
* Cleans the cache
*/
void do_cleanup()
{
bool expected = false;
// if no thread is cleaning up, we'll do it
if (!cleanup_lock_.compare_exchange_strong(expected, true)) {
return;
}

util::finally release_lock(
[this]() { this->cleanup_lock_.store(false); });

if (capacity_ >= cache_.size()) {
// this can happen if the cache is concurrently modified
return;
}
auto entries_to_remove = cache_.size() - capacity_;

/*max heap*/
std::priority_queue<int64_t> oldest_timestamps;

// 1st pass
const auto values = cache_.values();
for (const auto& value_and_timestamp : values) {
oldest_timestamps.push(value_and_timestamp->timestamp_);
if (oldest_timestamps.size() > entries_to_remove) {
oldest_timestamps.pop();
}
}

// find out the highest value in the queue - the value, below which
// entries will be removed
if (oldest_timestamps.empty()) {
// this can happen if the cache is concurrently modified
return;
}
int64_t remove_threshold = oldest_timestamps.top();
oldest_timestamps.pop();

// 2nd pass
cache_.remove_values_if(
[remove_threshold](const value_and_timestamp<V>& v) -> bool {
return (v.timestamp_ <= remove_threshold);
});
}

std::atomic<bool> cleanup_lock_{ false };
uint32_t capacity_;
uint32_t cleanup_threshold_;
};

} // namespace impl
} // namespace sql
} // namespace client
} // namespace hazelcast
19 changes: 18 additions & 1 deletion hazelcast/include/hazelcast/client/sql/sql_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "hazelcast/client/sql/sql_result.h"
#include "hazelcast/client/sql/sql_statement.h"
#include "hazelcast/client/sql/hazelcast_sql_exception.h"
#include "hazelcast/client/sql/impl/read_optimized_lru_cache.h"

#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
#pragma warning(push)
Expand Down Expand Up @@ -127,12 +128,17 @@ class HAZELCAST_API sql_service
boost::future<std::shared_ptr<sql_result>> execute(
const sql_statement& statement);

std::shared_ptr<impl::read_optimized_lru_cache<std::string, int32_t>>
partition_argument_index_cache_;

private:
friend client::impl::hazelcast_client_instance_impl;
friend sql_result;

client::spi::ClientContext& client_context_;

bool is_smart_routing_;

struct sql_execute_response_parameters
{
int64_t update_count;
Expand All @@ -141,6 +147,8 @@ class HAZELCAST_API sql_service
boost::optional<impl::sql_error> error;
bool is_infinite_rows = false;
bool is_infinite_rows_exist = false;
int32_t partition_argument_index = -1;
bool is_partition_argument_index_exists = false;
};

struct sql_fetch_response_parameters
Expand All @@ -152,6 +160,12 @@ class HAZELCAST_API sql_service
explicit sql_service(client::spi::ClientContext& context);

std::shared_ptr<connection::Connection> query_connection();
std::shared_ptr<connection::Connection> query_connection(
int32_t partition_id);

boost::optional<int32_t> extract_partition_id(
const sql_statement& statement,
const int32_t arg_index) const;

void rethrow(const std::exception& exc_ptr);
void rethrow(const std::exception& cause_ptr,
Expand All @@ -160,10 +174,13 @@ class HAZELCAST_API sql_service
boost::uuids::uuid client_id();

std::shared_ptr<sql_result> handle_execute_response(
const std::string& sql_query,
const int32_t original_partition_argument_index,
protocol::ClientMessage& msg,
std::shared_ptr<connection::Connection> connection,
impl::query_id id,
int32_t cursor_buffer_size);
int32_t cursor_buffer_size,
std::weak_ptr<std::atomic<int32_t>> statement_par_arg_index_ptr);

static sql_execute_response_parameters decode_execute_response(
protocol::ClientMessage& msg);
Expand Down
21 changes: 21 additions & 0 deletions hazelcast/include/hazelcast/client/sql/sql_statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ class HAZELCAST_API sql_statement
*/
sql_statement& expected_result_type(sql_expected_result_type type);

/**
* Get the partition argument index value
*
* @return partition argument index, -1 if not set.
*/
std::shared_ptr<std::atomic<int32_t>> partition_argument_index() const;

/**
* Gets the schema name.
*
Expand Down Expand Up @@ -224,12 +231,26 @@ class HAZELCAST_API sql_statement

sql_statement(spi::ClientContext& client_context, std::string query);

/**
* Set the partition argument index. If there's no such argument, use -1.
* <p>
* Setting a wrong argument index will not cause incorrect query results,
* but might cause performance degradation due to more network
* communication. Setting a value higher than the actual number of arguments
* will have no effect.
*
* @param partition_argument_index index of the partition-determining
* argument of the statement
*/
sql_statement& partition_argument_index(int32_t partition_argument_index);

std::string sql_;
std::vector<data> serialized_parameters_;
int32_t cursor_buffer_size_;
std::chrono::milliseconds timeout_;
sql::sql_expected_result_type expected_result_type_;
boost::optional<std::string> schema_;
std::shared_ptr<std::atomic<int32_t>> partition_argument_index_;

serialization_service& serialization_service_;

Expand Down
18 changes: 18 additions & 0 deletions hazelcast/include/hazelcast/util/SynchronizedMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,24 @@ class SynchronizedMap
return internal_map_.empty();
}

/**
* @param comp Map is iterated from beginning to the end and removed if the
* lambda comp return true.
*/
template<typename Comparator>
void remove_values_if(Comparator comp)
{
std::lock_guard<std::mutex> lg(map_lock_);

for (auto iter = internal_map_.begin(); iter != internal_map_.end();) {
if (iter->second != nullptr && comp(*(iter->second))) {
iter = internal_map_.erase(iter);
} else {
++iter;
}
}
}

private:
std::unordered_map<K, std::shared_ptr<V>, Hash> internal_map_;
mutable std::mutex map_lock_;
Expand Down
8 changes: 8 additions & 0 deletions hazelcast/src/hazelcast/client/client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,8 @@ client_properties::client_properties(
, fail_on_indeterminate_state_(FAIL_ON_INDETERMINATE_OPERATION_STATE,
FAIL_ON_INDETERMINATE_OPERATION_STATE_DEFAULT)
, cloud_base_url_(CLOUD_URL_BASE, CLOUD_URL_BASE_DEFAULT)
, partition_arg_cache_size_(PARTITION_ARGUMENT_CACHE_SIZE,
PARTITION_ARGUMENT_CACHE_SIZE_DEFAULT)
, properties_map_(properties)
{}

Expand Down Expand Up @@ -1202,6 +1204,12 @@ client_properties::cloud_base_url() const
return cloud_base_url_;
}

const client_property&
client_properties::partition_arg_cache_size() const
{
return partition_arg_cache_size_;
}

namespace exception {
iexception::iexception(std::string exception_name,
std::string source,
Expand Down