Skip to content

Commit

Permalink
sql_page_iterator_sync is added.
Browse files Browse the repository at this point in the history
  • Loading branch information
OzanCansel committed Dec 16, 2022
1 parent fca0230 commit fc694a4
Show file tree
Hide file tree
Showing 4 changed files with 379 additions and 0 deletions.
1 change: 1 addition & 0 deletions examples/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ add_executable(sql_query_with_portable sql_query_with_portable.cpp)
add_executable(sql_cancellation_example sql_cancellation_example.cpp)
add_executable(sql_json_example sql_json_example.cpp)
add_executable(sql_order_by_limit_offset sql_order_by_limit_offset.cpp)
add_executable(sql_page_iterator_sync sql_page_iterator_sync.cpp)
225 changes: 225 additions & 0 deletions examples/sql/sql_page_iterator_sync.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@

/*
* Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <iterator>

#include <boost/algorithm/string.hpp>

#include <hazelcast/client/hazelcast_client.h>

using hazelcast::client::hazelcast_client;

void
populate_map(hazelcast_client&);
void
create_mapping(hazelcast_client&);
void
for_loop(hazelcast_client&);
void
algorithm_copy(hazelcast_client&);
void
algorithm_filter(hazelcast_client&);
void
timeout(hazelcast_client&);

/**
* Normally, cpp-client provides an async api for every features.
* But for sake of convenience and similar usages with native C++ iterators
* cpp-client provides sync page iterator. So `page_iterator_sync` is a blocking
* iterator. It wraps the `page_iterator` and allow it to be used in sync
* manner. This example demonstrates how to use `page_iterator_sync` and what
* the uses cases are.
*/
int
main()
{
auto hz = hazelcast::new_client().get();

// Preparation
populate_map(hz);
create_mapping(hz);

// Use cases, examples
for_loop(hz);
algorithm_copy(hz);
algorithm_filter(hz);
timeout(hz);

return 0;
}

void
populate_map(hazelcast_client& client)
{
// Populate a map before using it in sql.
auto map = client.get_map("integers").get();

for (int i = 0; i < 100; ++i) {
map->put(i, i).get();
}
}

void
create_mapping(hazelcast_client& client)
{
// Create mapping for the integers.
// This needs to be done only once per map.
// It is required to use a map in SQL query.
auto result = client.get_sql()
.execute(R"(
CREATE OR REPLACE MAPPING integers
TYPE IMap
OPTIONS (
'keyFormat' = 'int',
'valueFormat' = 'int'
)
)")
.get();
}

std::shared_ptr<hazelcast::client::sql::sql_result>
select_numbers(hazelcast::client::hazelcast_client& client)
{
using namespace hazelcast::client::sql;

sql_statement statement(client, "SELECT * FROM integers");

// Set cursor buffer size to 5
// So there will be 20 pages(100 / 5 = 20)
statement.cursor_buffer_size(5);

return client.get_sql().execute(statement).get();
}

void
seperator(const std::string& text = std::string{})
{
std::string output(60, '=');
boost::replace_first(output, std::string(text.size(), '='), text);

output = std::string(20, '=') + output;

std::cout << output << std::endl;
}

void
for_loop(hazelcast_client& client)
{
seperator("for_loop() - BEGIN");

auto result = select_numbers(client);

for (auto it = result->pbegin(); it != result->pend(); ++it) {
seperator();
for (const auto& row : it->rows()) {
std::cout << *row.get_object<int>(0);
}

std::cout << std::endl;
}

seperator("for_loop() - END");
seperator();
}

void
algorithm_copy(hazelcast_client& client)
{
using hazelcast::client::sql::sql_page;

seperator("algorithm_copy() - BEGIN");

auto result = select_numbers(client);

std::vector<std::shared_ptr<sql_page>> pages;

copy(result->pbegin(), result->pend(), back_inserter(pages));

std::vector<int> numbers;

for (const auto& page : pages) {
transform(
begin(page->rows()),
end(page->rows()),
back_inserter(numbers),
[](const sql_page::sql_row& row) { return *row.get_object<int>(0); });
}

sort(begin(numbers), end(numbers));
copy(begin(numbers),
end(numbers),
std::ostream_iterator<int>(std::cout, "\n"));

seperator("algorithm_copy - END");
}

void
algorithm_filter(hazelcast_client& client)
{
using hazelcast::client::sql::sql_page;

seperator("algorithm_filter - BEGIN");

auto result = select_numbers(client);

std::vector<std::shared_ptr<sql_page>> pages;

copy_if(result->pbegin(),
result->pend(),
back_inserter(pages),
[](const std::shared_ptr<sql_page>& p) {
// Filter out the pages which contains a number which is
// divisable by 20
return any_of(begin(p->rows()),
end(p->rows()),
[](const sql_page::sql_row& row) {
return *row.get_object<int>(0) % 20 == 0;
});
});

for (const auto& page : pages) {
for (const sql_page::sql_row& row : page->rows()) {
std::cout << row.get_object<int>(0) << " ";
}

std::cout << std::endl;
}

seperator("algorithm_filter - END");
}

void
timeout(hazelcast_client& client)
{
seperator("timeout - BEGIN");

// `generate_stream(1)` generates a row per seconds, so it will guaranteed
// that it will timeout
auto result =
client.get_sql().execute("SELECT * FROM TABLE(generate_stream(1))").get();

auto it = result->pbegin(std::chrono::milliseconds{ 1 });

try {
++it;
++it;
} catch (hazelcast::client::exception::no_such_element& e) {
std::cout << "Timedout" << std::endl;
}

seperator("timeout - END");
}
47 changes: 47 additions & 0 deletions hazelcast/include/hazelcast/client/sql/sql_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
#pragma once

#include <iterator>
#include <chrono>

#include <boost/thread/future.hpp>

#include "hazelcast/util/export.h"
Expand Down Expand Up @@ -99,6 +102,46 @@ class HAZELCAST_API sql_result : public std::enable_shared_from_this<sql_result>
std::shared_ptr<sql_page> first_page_;
};

class HAZELCAST_API page_iterator_sync
{
public:
using difference_type = void;
using value_type = std::shared_ptr<sql_page>;
using pointer = std::shared_ptr<sql_page>;
using reference = std::shared_ptr<sql_page>&;
using iterator_category = std::input_iterator_tag;

page_iterator_sync(page_iterator&&, std::chrono::milliseconds timeout);
page_iterator_sync() = default;

void set_timeout(std::chrono::milliseconds);
std::chrono::milliseconds timeout() const;

friend HAZELCAST_API bool operator==(const page_iterator_sync&,
const page_iterator_sync&);
friend HAZELCAST_API bool operator!=(const page_iterator_sync&,
const page_iterator_sync&);

std::shared_ptr<sql_page> operator*();
std::shared_ptr<sql_page> operator->();

page_iterator_sync operator++(int) = delete;
page_iterator_sync& operator++();

private:
struct non_copyables
{
explicit non_copyables(page_iterator&&);

boost::future<std::shared_ptr<sql_page>> preloaded_page_;
page_iterator iter_;
};

std::shared_ptr<non_copyables> block_;
std::shared_ptr<sql_page> current_;
std::chrono::milliseconds timeout_;
};

/**
* The destructor closes the result if it were open.
*/
Expand Down Expand Up @@ -152,6 +195,10 @@ class HAZELCAST_API sql_result : public std::enable_shared_from_this<sql_result>
*/
page_iterator iterator();

page_iterator_sync pbegin(
std::chrono::milliseconds timeout = std::chrono::milliseconds{ -1 });
page_iterator_sync pend();

private:
friend class sql_service;

Expand Down

0 comments on commit fc694a4

Please sign in to comment.