diff --git a/include/boost/compute/algorithm/detail/reduce_by_key.hpp b/include/boost/compute/algorithm/detail/reduce_by_key.hpp new file mode 100644 index 000000000..65844c9eb --- /dev/null +++ b/include/boost/compute/algorithm/detail/reduce_by_key.hpp @@ -0,0 +1,119 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2015 Jakub Szuppe +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#ifndef BOOST_COMPUTE_ALGORITHM_DETAIL_REDUCE_BY_KEY_HPP +#define BOOST_COMPUTE_ALGORITHM_DETAIL_REDUCE_BY_KEY_HPP + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace compute { +namespace detail { + +template +size_t reduce_by_key_on_gpu(InputKeyIterator keys_first, + InputKeyIterator keys_last, + InputValueIterator values_first, + OutputKeyIterator keys_result, + OutputValueIterator values_result, + BinaryFunction function, + BinaryPredicate predicate, + command_queue &queue) +{ + return detail::reduce_by_key_with_scan(keys_first, keys_last, values_first, + keys_result, values_result, function, + predicate, queue); +} + +template +bool reduce_by_key_on_gpu_requirements_met(InputKeyIterator keys_first, + InputValueIterator values_first, + OutputKeyIterator keys_result, + OutputValueIterator values_result, + const size_t count, + command_queue &queue) +{ + const device &device = queue.get_device(); + return (count > 256) + && !(device.type() & device::cpu) + && reduce_by_key_with_scan_requirements_met(keys_first, values_first, + keys_result,values_result, + count, queue); + return true; +} + +template +inline std::pair +dispatch_reduce_by_key(InputKeyIterator keys_first, + InputKeyIterator keys_last, + InputValueIterator values_first, + OutputKeyIterator keys_result, + OutputValueIterator values_result, + BinaryFunction function, + BinaryPredicate predicate, + command_queue &queue) +{ + typedef typename + std::iterator_traits::difference_type key_difference_type; + typedef typename + std::iterator_traits::difference_type value_difference_type; + + const size_t count = detail::iterator_range_size(keys_first, keys_last); + if (count < 2) { + boost::compute::copy_n(keys_first, count, keys_result, queue); + boost::compute::copy_n(values_first, count, values_result, queue); + return + std::make_pair( + keys_result + static_cast(count), + values_result + static_cast(count) + ); + } + + size_t result_size = 0; + if(reduce_by_key_on_gpu_requirements_met(keys_first, values_first, keys_result, + values_result, count, queue)){ + result_size = + detail::reduce_by_key_on_gpu(keys_first, keys_last, values_first, + keys_result, values_result, function, + predicate, queue); + } + else { + result_size = + detail::serial_reduce_by_key(keys_first, keys_last, values_first, + keys_result, values_result, function, + predicate, queue); + } + + return + std::make_pair( + keys_result + static_cast(result_size), + values_result + static_cast(result_size) + ); +} + +} // end detail namespace +} // end compute namespace +} // end boost namespace + +#endif // BOOST_COMPUTE_ALGORITHM_DETAIL_REDUCE_BY_KEY_HPP diff --git a/include/boost/compute/algorithm/detail/reduce_by_key_with_scan.hpp b/include/boost/compute/algorithm/detail/reduce_by_key_with_scan.hpp new file mode 100644 index 000000000..beb0cc9ee --- /dev/null +++ b/include/boost/compute/algorithm/detail/reduce_by_key_with_scan.hpp @@ -0,0 +1,519 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2015 Jakub Szuppe +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#ifndef BOOST_COMPUTE_ALGORITHM_DETAIL_REDUCE_BY_KEY_WITH_SCAN_HPP +#define BOOST_COMPUTE_ALGORITHM_DETAIL_REDUCE_BY_KEY_WITH_SCAN_HPP + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace compute { +namespace detail { + +/// \internal_ +/// +/// Fills \p new_keys_first with unsigned integer keys generated from vector +/// of original keys \p keys_first. New keys can be distinguish by simple equality +/// predicate. +/// +/// \param keys_first iterator pointing to the first key +/// \param number_of_keys number of keys +/// \param predicate binary predicate for key comparison +/// \param new_keys_first iterator pointing to the new keys vector +/// \param preferred_work_group_size preferred work group size +/// \param queue command queue to perform the operation +/// +/// Binary function \p predicate must take two keys as arguments and +/// return true only if they are considered the same. +/// +/// The first new key equals zero and the last equals number of unique keys +/// minus one. +/// +/// No local memory usage. +template +inline void generate_uint_keys(InputKeyIterator keys_first, + size_t number_of_keys, + BinaryPredicate predicate, + vector::iterator new_keys_first, + size_t preferred_work_group_size, + command_queue &queue) +{ + typedef typename + std::iterator_traits::value_type key_type; + + detail::meta_kernel k("reduce_by_key_new_key_flags"); + k.add_set_arg("count", uint_(number_of_keys)); + + k << + k.decl("gid") << " = get_global_id(0);\n" << + k.decl("value") << " = 0;\n" << + "if(gid >= count){\n return;\n}\n" << + "if(gid > 0){ \n" << + k.decl("key") << " = " << + keys_first[k.var("gid")] << ";\n" << + k.decl("previous_key") << " = " << + keys_first[k.var("gid - 1")] << ";\n" << + " value = " << predicate(k.var("previous_key"), + k.var("key")) << + " ? 0 : 1;\n" << + "}\n else {\n" << + " value = 0;\n" << + "}\n" << + new_keys_first[k.var("gid")] << " = value;\n"; + + const context &context = queue.get_context(); + kernel kernel = k.compile(context); + + size_t work_group_size = preferred_work_group_size; + size_t work_groups_no = static_cast( + std::ceil(float(number_of_keys) / work_group_size) + ); + + queue.enqueue_1d_range_kernel(kernel, + 0, + work_groups_no * work_group_size, + work_group_size); + + inclusive_scan(new_keys_first, new_keys_first + number_of_keys, + new_keys_first, queue); +} + +/// \internal_ +/// Calculate carry-out for each work group. +/// Carry-out is a pair of the last key processed by a work group and sum of all +/// values under this key in this work group. +template +inline void carry_outs(vector::iterator keys_first, + InputValueIterator values_first, + size_t count, + vector::iterator carry_out_keys_first, + OutputValueIterator carry_out_values_first, + BinaryFunction function, + size_t work_group_size, + command_queue &queue) +{ + typedef typename + std::iterator_traits::value_type value_out_type; + + detail::meta_kernel k("reduce_by_key_with_scan_carry_outs"); + k.add_set_arg("count", uint_(count)); + size_t local_keys_arg = k.add_arg(memory_object::local_memory, "lkeys"); + size_t local_vals_arg = k.add_arg(memory_object::local_memory, "lvals"); + + k << + k.decl("gid") << " = get_global_id(0);\n" << + k.decl("wg_size") << " = get_local_size(0);\n" << + k.decl("lid") << " = get_local_id(0);\n" << + k.decl("group_id") << " = get_group_id(0);\n" << + + "if(gid >= count){\n return;\n}\n" << + + k.decl("key") << " = " << keys_first[k.var("gid")] << ";\n" << + k.decl("value") << " = " << values_first[k.var("gid")] << ";\n" << + "lkeys[lid] = key;\n" << + "lvals[lid] = value;\n" << + + // Calculate carry out for each work group by performing Hillis/Steele scan + // where only last element (key-value pair) is saved + k.decl("result") << " = value;\n" << + k.decl("other_key") << ";\n" << + k.decl("other_value") << ";\n" << + + "for(" << k.decl("offset") << " = 1; " << + "offset < wg_size && lid >= offset; offset *= 2){\n" + " barrier(CLK_LOCAL_MEM_FENCE);\n" << + " other_key = lkeys[lid - offset];\n" << + " if(other_key == key){\n" << + " other_value = lvals[lid - offset];\n" << + " result = " << function(k.var("result"), + k.var("other_value")) << ";\n" << + " }\n" << + " barrier(CLK_LOCAL_MEM_FENCE);\n" << + " lvals[lid] = result;\n" << + "}\n" << + + // save carry out + "if(lid == (wg_size - 1)){\n" << + carry_out_keys_first[k.var("group_id")] << " = key;\n" << + carry_out_values_first[k.var("group_id")] << " = result;\n" << + "}\n"; + + size_t work_groups_no = static_cast( + std::ceil(float(count) / work_group_size) + ); + + const context &context = queue.get_context(); + kernel kernel = k.compile(context); + kernel.set_arg(local_keys_arg, local_buffer(work_group_size)); + kernel.set_arg(local_vals_arg, local_buffer(work_group_size)); + + queue.enqueue_1d_range_kernel(kernel, + 0, + work_groups_no * work_group_size, + work_group_size); +} + +/// \internal_ +/// Calculate carry-in by performing inclusive scan by key on carry-outs vector. +template +inline void carry_ins(vector::iterator carry_out_keys_first, + OutputValueIterator carry_out_values_first, + OutputValueIterator carry_in_values_first, + size_t carry_out_size, + BinaryFunction function, + size_t work_group_size, + command_queue &queue) +{ + typedef typename + std::iterator_traits::value_type value_out_type; + + uint_ values_pre_work_item = static_cast( + std::ceil(float(carry_out_size) / work_group_size) + ); + + detail::meta_kernel k("reduce_by_key_with_scan_carry_ins"); + k.add_set_arg("carry_out_size", uint_(carry_out_size)); + k.add_set_arg("values_per_work_item", values_pre_work_item); + size_t local_keys_arg = k.add_arg(memory_object::local_memory, "lkeys"); + size_t local_vals_arg = k.add_arg(memory_object::local_memory, "lvals"); + + k << + k.decl("id") << " = get_global_id(0) * values_per_work_item;\n" << + k.decl("idx") << " = id;\n" << + k.decl("wg_size") << " = get_local_size(0);\n" << + k.decl("lid") << " = get_local_id(0);\n" << + k.decl("group_id") << " = get_group_id(0);\n" << + + "if(id >= carry_out_size){\n return;\n}\n" << + + k.decl("key") << ";\n" << + k.decl("value") << ";\n" << + k.decl("previous_key") << " = " << carry_out_keys_first[k.var("idx")] << ";\n" << + k.decl("result") << " = " << carry_out_values_first[k.var("idx")] << ";\n" << + carry_in_values_first[k.var("idx")] << " = result;\n" << + + k.decl("end") << " = (id + values_per_work_item) <= carry_out_size" << + " ? (values_per_work_item + id) : carry_out_size;\n" << + + "for(idx = idx + 1; idx < end; idx += 1){\n" << + " key = " << carry_out_keys_first[k.var("idx")] << ";\n" << + " value = " << carry_out_values_first[k.var("idx")] << ";\n" << + " if(previous_key == key){\n" << + " result = " << function(k.var("result"), + k.var("value")) << ";\n" << + " }\n else { \n" << + " result = value;\n" + " }\n" << + " " << carry_in_values_first[k.var("idx")] << " = result;\n" << + " previous_key = key;\n" + "}\n" << + + // save the last key and result to local memory + "lkeys[lid] = previous_key;\n" << + "lvals[lid] = result;\n" << + + // Hillis/Steele scan + "for(" << k.decl("offset") << " = 1; " << + "offset < wg_size && lid >= offset; offset *= 2){\n" + " barrier(CLK_LOCAL_MEM_FENCE);\n" << + " key = lkeys[lid - offset];\n" << + " if(previous_key == key){\n" << + " value = lvals[lid - offset];\n" << + " result = " << function(k.var("result"), + k.var("value")) << ";\n" << + " }\n" << + " barrier(CLK_LOCAL_MEM_FENCE);\n" << + " lvals[lid] = result;\n" << + "}\n" << + "barrier(CLK_LOCAL_MEM_FENCE);\n" << + + // first in the group has nothing to do + "if(lid == 0){\n return;\n}\n" << + + // load key-value reduced by previous work item + "previous_key = lkeys[lid - 1];\n" << + "result = lvals[lid - 1];\n" << + + // make sure all carry-ins are saved in global memory + "barrier( CLK_GLOBAL_MEM_FENCE );\n" << + + // add key-value reduced by previous work item + "for(idx = id; idx < end; idx += 1){\n" << + " key = " << carry_out_keys_first[k.var("idx")] << ";\n" << + " value = " << carry_in_values_first[k.var("idx")] << ";\n" << + " if(previous_key == key){\n" << + " value = " << function(k.var("result"), + k.var("value")) << ";\n" << + " }\n" << + " " << carry_in_values_first[k.var("idx")] << " = value;\n" << + "}\n"; + + + const context &context = queue.get_context(); + kernel kernel = k.compile(context); + kernel.set_arg(local_keys_arg, local_buffer(work_group_size)); + kernel.set_arg(local_vals_arg, local_buffer(work_group_size)); + + queue.enqueue_1d_range_kernel(kernel, + 0, + work_group_size, + work_group_size); +} + +/// \internal_ +/// +/// Perform final reduction by key. Each work item: +/// 1. Perform local work-group reduction (Hillis/Steele scan) +/// 2. Add carry-in (if keys are right) +/// 3. Save reduced value if next key is different than processed one +template +inline void final_reduction(InputKeyIterator keys_first, + InputValueIterator values_first, + OutputKeyIterator keys_result, + OutputValueIterator values_result, + size_t count, + BinaryFunction function, + vector::iterator new_keys_first, + vector::iterator carry_in_keys_first, + OutputValueIterator carry_in_values_first, + size_t carry_in_size, + size_t work_group_size, + command_queue &queue) +{ + typedef typename + std::iterator_traits::value_type value_out_type; + + detail::meta_kernel k("reduce_by_key_with_scan_final_reduction"); + k.add_set_arg("count", uint_(count)); + size_t local_keys_arg = k.add_arg(memory_object::local_memory, "lkeys"); + size_t local_vals_arg = k.add_arg(memory_object::local_memory, "lvals"); + + k << + k.decl("gid") << " = get_global_id(0);\n" << + k.decl("wg_size") << " = get_local_size(0);\n" << + k.decl("lid") << " = get_local_id(0);\n" << + k.decl("group_id") << " = get_group_id(0);\n" << + + "if(gid >= count){\n return;\n}\n" << + + k.decl("key") << " = " << new_keys_first[k.var("gid")] << ";\n" << + k.decl("value") << " = " << values_first[k.var("gid")] << ";\n" << + "lkeys[lid] = key;\n" << + "lvals[lid] = value;\n" << + + // Hillis/Steele scan + k.decl("result") << " = value;\n" << + k.decl("other_key") << ";\n" << + k.decl("other_value") << ";\n" << + + "for(" << k.decl("offset") << " = 1; " << + "offset < wg_size && lid >= offset; offset *= 2){\n" + " barrier(CLK_LOCAL_MEM_FENCE);\n" << + " other_key = lkeys[lid - offset];\n" << + " if(other_key == key){\n" << + " other_value = lvals[lid - offset];\n" << + " result = " << function(k.var("result"), + k.var("other_value")) << ";\n" << + " }\n" << + " barrier(CLK_LOCAL_MEM_FENCE);\n" << + " lvals[lid] = result;\n" << + "}\n" << + + k.decl("save") << " = (gid < (count - 1)) ?" + << new_keys_first[k.var("gid + 1")] << " != key" << + ": true;\n" << + + // Add carry in + k.decl("carry_in_key") << ";\n" << + "if(group_id > 0 && save) {\n" << + " carry_in_key = " << carry_in_keys_first[k.var("group_id - 1")] << ";\n" << + " if(key == carry_in_key){\n" << + " other_value = " << carry_in_values_first[k.var("group_id - 1")] << ";\n" << + " result = " << function(k.var("result"), + k.var("other_value")) << ";\n" << + " }\n" << + "}\n" << + + // Save result only if the next key is different or it's the last element. + "if(save){\n" << + keys_result[k.var("key")] << " = " << keys_first[k.var("gid")] << ";\n" << + values_result[k.var("key")] << " = result;\n" << + "}\n" + ; + + size_t work_groups_no = static_cast( + std::ceil(float(count) / work_group_size) + ); + + const context &context = queue.get_context(); + kernel kernel = k.compile(context); + kernel.set_arg(local_keys_arg, local_buffer(work_group_size)); + kernel.set_arg(local_vals_arg, local_buffer(work_group_size)); + + queue.enqueue_1d_range_kernel(kernel, + 0, + work_groups_no * work_group_size, + work_group_size); +} + +/// \internal_ +/// Returns preferred work group size for reduce by key with scan algorithm. +template +inline size_t get_work_group_size(const device& device) +{ + std::string cache_key = std::string("__boost_reduce_by_key_with_scan") + + "k_" + type_name() + "_v_" + type_name(); + + // load parameters + boost::shared_ptr parameters = + detail::parameter_cache::get_global_cache(device); + + return (std::max)( + static_cast(parameters->get(cache_key, "wgsize", 256)), + static_cast(device.get_info()) + ); +} + +/// \internal_ +/// +/// 1. For each work group carry-out value is calculated (it's done by key-oriented +/// Hillis/Steele scan). Carry-out is a pair of the last key processed by work +/// group and sum of all values under this key in work group. +/// 2. From every carry-out carry-in is calculated by performing inclusive scan +/// by key. +/// 3. Final reduction by key is performed (key-oriented Hillis/Steele scan), +/// carry-in values are added where needed. +template +inline size_t reduce_by_key_with_scan(InputKeyIterator keys_first, + InputKeyIterator keys_last, + InputValueIterator values_first, + OutputKeyIterator keys_result, + OutputValueIterator values_result, + BinaryFunction function, + BinaryPredicate predicate, + command_queue &queue) +{ + typedef typename + std::iterator_traits::value_type value_type; + typedef typename + std::iterator_traits::value_type key_type; + typedef typename + std::iterator_traits::value_type value_out_type; + + const context &context = queue.get_context(); + size_t count = detail::iterator_range_size(keys_first, keys_last); + + if(count == 0){ + return size_t(0); + } + + const device &device = queue.get_device(); + size_t work_group_size = get_work_group_size(device); + + // Replace original key with unsigned integer keys generated based on given + // predicate. New key is also an index for keys_result and values_result vectors, + // which points to place where reduced value should be saved. + vector new_keys(count, context); + vector::iterator new_keys_first = new_keys.begin(); + generate_uint_keys(keys_first, count, predicate, new_keys_first, + work_group_size, queue); + + // Calculate carry-out and carry-in vectors size + const size_t carry_out_size = static_cast( + std::ceil(float(count) / work_group_size) + ); + vector carry_out_keys(carry_out_size, context); + vector carry_out_values(carry_out_size, context); + carry_outs(new_keys_first, values_first, count, carry_out_keys.begin(), + carry_out_values.begin(), function, work_group_size, queue); + + vector carry_in_values(carry_out_size, context); + carry_ins(carry_out_keys.begin(), carry_out_values.begin(), + carry_in_values.begin(), carry_out_size, function, work_group_size, + queue); + + final_reduction(keys_first, values_first, keys_result, values_result, + count, function, new_keys_first, carry_out_keys.begin(), + carry_in_values.begin(), carry_out_size, work_group_size, + queue); + + const size_t result = read_single_value(new_keys.get_buffer(), + count - 1, queue); + return result + 1; +} + +/// \internal_ +/// Return true if requirements for running reduce by key with scan on given +/// device are met (at least one work group of preferred size can be run). +template +bool reduce_by_key_with_scan_requirements_met(InputKeyIterator keys_first, + InputValueIterator values_first, + OutputKeyIterator keys_result, + OutputValueIterator values_result, + const size_t count, + command_queue &queue) +{ + typedef typename + std::iterator_traits::value_type value_type; + typedef typename + std::iterator_traits::value_type key_type; + typedef typename + std::iterator_traits::value_type value_out_type; + + (void) keys_first; + (void) values_first; + (void) keys_result; + (void) values_result; + + const device &device = queue.get_device(); + // device must have dedicated local memory storage + if(device.get_info() != CL_LOCAL) + { + return false; + } + + // local memory size in bytes (per compute unit) + const size_t local_mem_size = device.get_info(); + + // preferred work group size + size_t work_group_size = get_work_group_size(device); + + // local memory size needed to perform parallel reduction + size_t required_local_mem_size = 0; + // keys size + required_local_mem_size += sizeof(uint_) * work_group_size; + // reduced values size + required_local_mem_size += sizeof(value_out_type) * work_group_size; + + return (required_local_mem_size <= local_mem_size); +} + +} // end detail namespace +} // end compute namespace +} // end boost namespace + +#endif // BOOST_COMPUTE_ALGORITHM_DETAIL_REDUCE_BY_KEY_WITH_SCAN_HPP diff --git a/include/boost/compute/algorithm/detail/serial_reduce_by_key.hpp b/include/boost/compute/algorithm/detail/serial_reduce_by_key.hpp new file mode 100644 index 000000000..f9bda8e47 --- /dev/null +++ b/include/boost/compute/algorithm/detail/serial_reduce_by_key.hpp @@ -0,0 +1,108 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2015 Jakub Szuppe +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#ifndef BOOST_COMPUTE_ALGORITHM_DETAIL_SERIAL_REDUCE_BY_KEY_HPP +#define BOOST_COMPUTE_ALGORITHM_DETAIL_SERIAL_REDUCE_BY_KEY_HPP + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace compute { +namespace detail { + +template +inline size_t serial_reduce_by_key(InputKeyIterator keys_first, + InputKeyIterator keys_last, + InputValueIterator values_first, + OutputKeyIterator keys_result, + OutputValueIterator values_result, + BinaryFunction function, + BinaryPredicate predicate, + command_queue &queue) +{ + typedef typename + std::iterator_traits::value_type value_type; + typedef typename + std::iterator_traits::value_type key_type; + typedef typename + ::boost::compute::result_of::type result_type; + + const context &context = queue.get_context(); + size_t count = detail::iterator_range_size(keys_first, keys_last); + if(count < 1){ + return count; + } + + meta_kernel k("serial_reduce_by_key"); + size_t count_arg = k.add_arg("count"); + size_t result_size_arg = k.add_arg(memory_object::global_memory, + "result_size"); + + convert to_result_type; + + k << + k.decl("result") << + " = " << to_result_type(values_first[0]) << ";\n" << + k.decl("previous_key") << " = " << keys_first[0] << ";\n" << + k.decl("value") << ";\n" << + k.decl("key") << ";\n" << + + k.decl("size") << " = 1;\n" << + + keys_result[0] << " = previous_key;\n" << + values_result[0] << " = result;\n" << + + "for(ulong i = 1; i < count; i++) {\n" << + " value = " << to_result_type(values_first[k.var("i")]) << ";\n" << + " key = " << keys_first[k.var("i")] << ";\n" << + " if (" << predicate(k.var("previous_key"), + k.var("key")) << ") {\n" << + + " result = " << function(k.var("result"), + k.var("value")) << ";\n" << + " }\n " << + " else { \n" << + keys_result[k.var("size - 1")] << " = previous_key;\n" << + values_result[k.var("size - 1")] << " = result;\n" << + " result = value;\n" << + " size++;\n" << + " } \n" << + " previous_key = key;\n" << + "}\n" << + keys_result[k.var("size - 1")] << " = previous_key;\n" << + values_result[k.var("size - 1")] << " = result;\n" << + "*result_size = size;"; + + kernel kernel = k.compile(context); + + scalar result_size(context); + kernel.set_arg(result_size_arg, result_size.get_buffer()); + kernel.set_arg(count_arg, static_cast(count)); + + queue.enqueue_task(kernel); + + return static_cast(result_size.read(queue)); +} + +} // end detail namespace +} // end compute namespace +} // end boost namespace + +#endif // BOOST_COMPUTE_ALGORITHM_DETAIL_SERIAL_REDUCE_BY_KEY_HPP diff --git a/include/boost/compute/algorithm/reduce_by_key.hpp b/include/boost/compute/algorithm/reduce_by_key.hpp new file mode 100644 index 000000000..87c73e887 --- /dev/null +++ b/include/boost/compute/algorithm/reduce_by_key.hpp @@ -0,0 +1,118 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2015 Jakub Szuppe +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#ifndef BOOST_COMPUTE_ALGORITHM_REDUCE_BY_KEY_HPP +#define BOOST_COMPUTE_ALGORITHM_REDUCE_BY_KEY_HPP + +#include +#include + +#include +#include +#include +#include +#include + +namespace boost { +namespace compute { + +/// The \c reduce_by_key() algorithm performs reduction for each contiguous +/// subsequence of values determinate by equivalent keys. +/// +/// Returns a pair of iterators at the end of the ranges [\p keys_result, keys_result_last) +/// and [\p values_result, \p values_result_last). +/// +/// If no function is specified, \c plus will be used. +/// If no predicate is specified, \c equal_to will be used. +/// +/// \param keys_first the first key +/// \param keys_last the last key +/// \param values_first the first input value +/// \param keys_result iterator pointing to the key output +/// \param values_result iterator pointing to the reduced value output +/// \param function binary reduction function +/// \param predicate binary predicate which returns true only if two keys are equal +/// \param queue command queue to perform the operation +/// +/// The \c reduce_by_key() algorithm assumes that the binary reduction function +/// is associative. When used with non-associative functions the result may +/// be non-deterministic and vary in precision. Notably this affects the +/// \c plus() function as floating-point addition is not associative +/// and may produce slightly different results than a serial algorithm. +/// +/// For example, to calculate the sum of the values for each key: +/// +/// \snippet test/test_reduce_by_key.cpp reduce_by_key_int +/// +/// \see reduce() +template +inline std::pair +reduce_by_key(InputKeyIterator keys_first, + InputKeyIterator keys_last, + InputValueIterator values_first, + OutputKeyIterator keys_result, + OutputValueIterator values_result, + BinaryFunction function, + BinaryPredicate predicate, + command_queue &queue = system::default_queue()) +{ + return detail::dispatch_reduce_by_key(keys_first, keys_last, values_first, + keys_result, values_result, + function, predicate, + queue); +} + +/// \overload +template +inline std::pair +reduce_by_key(InputKeyIterator keys_first, + InputKeyIterator keys_last, + InputValueIterator values_first, + OutputKeyIterator keys_result, + OutputValueIterator values_result, + BinaryFunction function, + command_queue &queue = system::default_queue()) +{ + typedef typename std::iterator_traits::value_type key_type; + + return reduce_by_key(keys_first, keys_last, values_first, + keys_result, values_result, + function, equal_to(), + queue); +} + +/// \overload +template +inline std::pair +reduce_by_key(InputKeyIterator keys_first, + InputKeyIterator keys_last, + InputValueIterator values_first, + OutputKeyIterator keys_result, + OutputValueIterator values_result, + command_queue &queue = system::default_queue()) +{ + typedef typename std::iterator_traits::value_type key_type; + typedef typename std::iterator_traits::value_type value_type; + + return reduce_by_key(keys_first, keys_last, values_first, + keys_result, values_result, + plus(), equal_to(), + queue); +} + +} // end compute namespace +} // end boost namespace + +#endif // BOOST_COMPUTE_ALGORITHM_REDUCE_BY_KEY_HPP diff --git a/perf/CMakeLists.txt b/perf/CMakeLists.txt index 726e66d2f..8afdcc2ab 100644 --- a/perf/CMakeLists.txt +++ b/perf/CMakeLists.txt @@ -51,6 +51,7 @@ set(BENCHMARKS rotate_copy host_sort random_number_engine + reduce_by_key saxpy search search_n @@ -132,6 +133,7 @@ if(${BOOST_COMPUTE_HAVE_CUDA}) thrust_merge thrust_partial_sum thrust_partition + thrust_reduce_by_key thrust_reverse thrust_reverse_copy thrust_rotate @@ -180,6 +182,7 @@ if(${BOOST_COMPUTE_HAVE_BOLT} AND ${BOOST_COMPUTE_USE_CPP11}) bolt_max_element bolt_merge bolt_partial_sum + bolt_reduce_by_key bolt_saxpy bolt_sort ) diff --git a/perf/perf.py b/perf/perf.py index f3ff1fee2..984535e1f 100755 --- a/perf/perf.py +++ b/perf/perf.py @@ -123,6 +123,7 @@ def run_benchmark(name, sizes, vs=[]): "merge", "partial_sum", "partition", + "reduce_by_key", "reverse", "reverse_copy", "rotate", @@ -139,6 +140,7 @@ def run_benchmark(name, sizes, vs=[]): "max_element", "merge", "partial_sum", + "reduce_by_key", "saxpy", "sort" ], diff --git a/perf/perf_bolt_reduce_by_key.cpp b/perf/perf_bolt_reduce_by_key.cpp new file mode 100644 index 000000000..e76684985 --- /dev/null +++ b/perf/perf_bolt_reduce_by_key.cpp @@ -0,0 +1,100 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2015 Jakub Szuppe +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#include +#include +#include + +#include +#include +#include + +#include "perf.hpp" + +int rand_int() +{ + return static_cast((rand() / double(RAND_MAX)) * 25.0); +} + +struct unique_key { + int current; + int avgValuesNoPerKey; + + unique_key() + { + current = 0; + avgValuesNoPerKey = 512; + } + + int operator()() + { + double p = double(1.0) / static_cast(avgValuesNoPerKey); + if((rand() / double(RAND_MAX)) <= p) + return ++current; + return current; + } +} UniqueKey; + +int main(int argc, char *argv[]) +{ + perf_parse_args(argc, argv); + + std::cout << "size: " << PERF_N << std::endl; + + bolt::cl::control ctrl = bolt::cl::control::getDefault(); + ::cl::Device device = ctrl.getDevice(); + std::cout << "device: " << device.getInfo() << std::endl; + + // create vector of keys and random values + std::vector host_keys(PERF_N); + std::vector host_values(PERF_N); + std::generate(host_keys.begin(), host_keys.end(), UniqueKey); + std::generate(host_values.begin(), host_values.end(), rand_int); + + // create device vectors for data + bolt::cl::device_vector device_keys(PERF_N); + bolt::cl::device_vector device_values(PERF_N); + + // transfer data to the device + bolt::cl::copy(host_keys.begin(), host_keys.end(), device_keys.begin()); + bolt::cl::copy(host_values.begin(), host_values.end(), device_values.begin()); + + // create device vectors for the results + bolt::cl::device_vector device_keys_results(PERF_N); + bolt::cl::device_vector device_values_results(PERF_N); + + typedef bolt::cl::device_vector::iterator iterType; + bolt::cl::pair result = { + device_keys_results.begin(), + device_values_results.begin() + }; + + perf_timer t; + for(size_t trial = 0; trial < PERF_TRIALS; trial++){ + t.start(); + result = bolt::cl::reduce_by_key(device_keys.begin(), + device_keys.end(), + device_values.begin(), + device_keys_results.begin(), + device_values_results.begin()); + t.stop(); + } + std::cout << "time: " << t.min_time() / 1e6 << " ms" << std::endl; + + size_t result_size = bolt::cl::distance(device_keys_results.begin(), result.first); + if(result_size != static_cast(host_keys[PERF_N-1] + 1)){ + std::cout << "ERROR: " + << "wrong number of keys" + << std::endl; + return -1; + } + + return 0; +} diff --git a/perf/perf_reduce_by_key.cpp b/perf/perf_reduce_by_key.cpp new file mode 100644 index 000000000..c88d450e9 --- /dev/null +++ b/perf/perf_reduce_by_key.cpp @@ -0,0 +1,114 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2015 Jakub Szuppe +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "perf.hpp" + +int rand_int() +{ + return static_cast((rand() / double(RAND_MAX)) * 25.0); +} + +struct unique_key { + int current; + int avgValuesNoPerKey; + + unique_key() + { + current = 0; + avgValuesNoPerKey = 512; + } + + int operator()() + { + double p = double(1.0) / static_cast(avgValuesNoPerKey); + if((rand() / double(RAND_MAX)) <= p) + return ++current; + return current; + } +} UniqueKey; + +int main(int argc, char *argv[]) +{ + perf_parse_args(argc, argv); + + std::cout << "size: " << PERF_N << std::endl; + + // setup context and queue for the default device + boost::compute::device device = boost::compute::system::default_device(); + boost::compute::context context(device); + boost::compute::command_queue queue(context, device); + std::cout << "device: " << device.name() << std::endl; + + // create vector of keys and random values + std::vector host_keys(PERF_N); + std::vector host_values(PERF_N); + std::generate(host_keys.begin(), host_keys.end(), UniqueKey); + std::generate(host_values.begin(), host_values.end(), rand_int); + + // create vectors for keys and values on the device and copy the data + boost::compute::vector device_keys(PERF_N, context); + boost::compute::vector device_values(PERF_N,context); + boost::compute::copy( + host_keys.begin(), + host_keys.end(), + device_keys.begin(), + queue + ); + boost::compute::copy( + host_values.begin(), + host_values.end(), + device_values.begin(), + queue + ); + + // vectors for the results + boost::compute::vector device_keys_results(PERF_N, context); + boost::compute::vector device_values_results(PERF_N,context); + + typedef boost::compute::vector::iterator iterType; + std::pair result( + device_keys_results.begin(), + device_values_results.begin() + ); + + // reduce by key + perf_timer t; + for(size_t trial = 0; trial < PERF_TRIALS; trial++){ + t.start(); + result = boost::compute::reduce_by_key(device_keys.begin(), + device_keys.end(), + device_values.begin(), + device_keys_results.begin(), + device_values_results.begin(), + queue); + t.stop(); + } + std::cout << "time: " << t.min_time() / 1e6 << " ms" << std::endl; + + size_t result_size = std::distance(device_keys_results.begin(), result.first); + if(result_size != static_cast(host_keys[PERF_N-1] + 1)){ + std::cout << "ERROR: " + << "wrong number of keys" << result_size << "\n" << (host_keys[PERF_N-1] + 1) + << std::endl; + return -1; + } + + return 0; +} diff --git a/perf/perf_thrust_reduce_by_key.cu b/perf/perf_thrust_reduce_by_key.cu new file mode 100644 index 000000000..4266cb273 --- /dev/null +++ b/perf/perf_thrust_reduce_by_key.cu @@ -0,0 +1,91 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2015 Jakub Szuppe +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#include +#include + +#include +#include +#include +#include +#include + +#include "perf.hpp" + +int rand_int() +{ + return static_cast((rand() / double(RAND_MAX)) * 25.0); +} + +struct unique_key { + int current; + int avgValuesNoPerKey; + + unique_key() + { + current = 0; + avgValuesNoPerKey = 512; + } + + int operator()() + { + double p = double(1.0) / static_cast(avgValuesNoPerKey); + if((rand() / double(RAND_MAX)) <= p) + return ++current; + return current; + } +} UniqueKey; + +int main(int argc, char *argv[]) +{ + perf_parse_args(argc, argv); + + std::cout << "size: " << PERF_N << std::endl; + + // create vector of keys and random values + thrust::host_vector host_keys(PERF_N); + thrust::host_vector host_values(PERF_N); + std::generate(host_keys.begin(), host_keys.end(), UniqueKey); + std::generate(host_values.begin(), host_values.end(), rand_int); + + // transfer data to the device + thrust::device_vector device_keys = host_keys; + thrust::device_vector device_values = host_values; + + // create device vectors for the results + thrust::device_vector device_keys_results(PERF_N); + thrust::device_vector device_values_results(PERF_N); + + typedef typename thrust::device_vector::iterator iterType; + thrust::pair result; + + perf_timer t; + for(size_t trial = 0; trial < PERF_TRIALS; trial++){ + t.start(); + result = thrust::reduce_by_key(device_keys.begin(), + device_keys.end(), + device_values.begin(), + device_keys_results.begin(), + device_values_results.begin()); + cudaDeviceSynchronize(); + t.stop(); + } + std::cout << "time: " << t.min_time() / 1e6 << " ms" << std::endl; + + size_t result_size = thrust::distance(device_keys_results.begin(), result.first); + if(result_size != static_cast(host_keys[PERF_N-1] + 1)){ + std::cout << "ERROR: " + << "wrong number of keys" + << std::endl; + return -1; + } + + return 0; +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ff2532cbd..3c23853b7 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -99,6 +99,7 @@ add_compute_test("algorithm.radix_sort" test_radix_sort.cpp) add_compute_test("algorithm.random_fill" test_random_fill.cpp) add_compute_test("algorithm.random_shuffle" test_random_shuffle.cpp) add_compute_test("algorithm.reduce" test_reduce.cpp) +add_compute_test("algorithm.reduce_by_key" test_reduce_by_key.cpp) add_compute_test("algorithm.remove" test_remove.cpp) add_compute_test("algorithm.replace" test_replace.cpp) add_compute_test("algorithm.reverse" test_reverse.cpp) diff --git a/test/test_reduce_by_key.cpp b/test/test_reduce_by_key.cpp new file mode 100644 index 000000000..50c0c6340 --- /dev/null +++ b/test/test_reduce_by_key.cpp @@ -0,0 +1,210 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2015 Jakub Szuppe +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#define BOOST_TEST_MODULE TestReduceByKey +#include + +#include +#include +#include +#include +#include +#include + +#include "check_macros.hpp" +#include "context_setup.hpp" + +namespace bc = boost::compute; + +BOOST_AUTO_TEST_CASE(reduce_by_key_int) +{ +//! [reduce_by_key_int] +// setup keys and values +int keys[] = { 0, 2, -3, -3, -3, -3, -3, 4 }; +int data[] = { 1, 1, 1, 1, 1, 2, 5, 1 }; + +boost::compute::vector keys_input(keys, keys + 8, queue); +boost::compute::vector values_input(data, data + 8, queue); + +boost::compute::vector keys_output(8, context); +boost::compute::vector values_output(8, context); +// reduce by key +boost::compute::reduce_by_key(keys_input.begin(), keys_input.end(), values_input.begin(), + keys_output.begin(), values_output.begin(), queue); +// keys_output = { 0, 2, -3, 4 } +// values_output = { 1, 1, 10, 1 } +//! [reduce_by_key_int] + CHECK_RANGE_EQUAL(int, 4, keys_output, (0, 2, -3, 4)); + CHECK_RANGE_EQUAL(int, 4, values_output, (1, 1, 10, 1)); +} + +BOOST_AUTO_TEST_CASE(reduce_by_key_int_long_vector) +{ + size_t size = 1024; + bc::vector keys_input(size, int(0), queue); + bc::vector values_input(size, int(1), queue); + + bc::vector keys_output(size, context); + bc::vector values_output(size, context); + + bc::reduce_by_key(keys_input.begin(), keys_input.end(), values_input.begin(), + keys_output.begin(), values_output.begin(), queue); + + CHECK_RANGE_EQUAL(int, 1, keys_output, (0)); + CHECK_RANGE_EQUAL(int, 1, values_output, (static_cast(size))); + + keys_input[137] = 1; + keys_input[677] = 1; + keys_input[1001] = 1; + bc::inclusive_scan(keys_input.begin(), keys_input.end(), keys_input.begin(), queue); + + bc::reduce_by_key(keys_input.begin(), keys_input.end(), values_input.begin(), + keys_output.begin(), values_output.begin(), queue); + + CHECK_RANGE_EQUAL(int, 4, keys_output, (0, 1, 2, 3)); + CHECK_RANGE_EQUAL(int, 4, values_output, (137, 540, 324, 23)); +} + +BOOST_AUTO_TEST_CASE(reduce_by_key_empty_vector) +{ + bc::vector keys_input(context); + bc::vector values_input(context); + + bc::vector keys_output(context); + bc::vector values_output(context); + + bc::reduce_by_key(keys_input.begin(), keys_input.end(), values_input.begin(), + keys_output.begin(), values_output.begin(), queue); + + BOOST_CHECK(keys_output.empty()); + BOOST_CHECK(values_output.empty()); +} + +BOOST_AUTO_TEST_CASE(reduce_by_key_int_one_key_value) +{ + int keys[] = { 22 }; + int data[] = { -9 }; + + bc::vector keys_input(keys, keys + 1, queue); + bc::vector values_input(data, data + 1, queue); + + bc::vector keys_output(1, context); + bc::vector values_output(1, context); + + bc::reduce_by_key(keys_input.begin(), keys_input.end(), values_input.begin(), + keys_output.begin(), values_output.begin(), queue); + + CHECK_RANGE_EQUAL(int, 1, keys_output, (22)); + CHECK_RANGE_EQUAL(int, 1, values_output, (-9)); +} + +BOOST_AUTO_TEST_CASE(reduce_by_key_int_min_max) +{ + int keys[] = { 0, 2, 2, 3, 3, 3, 3, 3, 4 }; + int data[] = { 1, 2, 1, -3, 1, 4, 2, 5, 77 }; + + bc::vector keys_input(keys, keys + 9, queue); + bc::vector values_input(data, data + 9, queue); + + bc::vector keys_output(9, context); + bc::vector values_output(9, context); + + bc::reduce_by_key(keys_input.begin(), keys_input.end(), values_input.begin(), + keys_output.begin(), values_output.begin(), bc::min(), + bc::equal_to(), queue); + + CHECK_RANGE_EQUAL(int, 4, keys_output, (0, 2, 3, 4)); + CHECK_RANGE_EQUAL(int, 4, values_output, (1, 1, -3, 77)); + + bc::reduce_by_key(keys_input.begin(), keys_input.end(), values_input.begin(), + keys_output.begin(), values_output.begin(), bc::max(), + bc::equal_to(), queue); + + CHECK_RANGE_EQUAL(int, 4, keys_output, (0, 2, 3, 4)); + CHECK_RANGE_EQUAL(int, 4, values_output, (1, 2, 5, 77)); +} + +BOOST_AUTO_TEST_CASE(reduce_by_key_float_max) +{ + int keys[] = { 0, 2, 2, 3, 3, 3, 3, 3, 4 }; + float data[] = { 1.0, 2.0, -1.5, -3.0, 1.0, -0.24, 2, 5, 77.1 }; + + bc::vector keys_input(keys, keys + 9, queue); + bc::vector values_input(data, data + 9, queue); + + bc::vector keys_output(9, context); + bc::vector values_output(9, context); + + bc::reduce_by_key(keys_input.begin(), keys_input.end(), values_input.begin(), + keys_output.begin(), values_output.begin(), bc::max(), + queue); + + CHECK_RANGE_EQUAL(int, 4, keys_output, (0, 2, 3, 4)); + BOOST_CHECK_CLOSE(float(values_output[0]), 1.0f, 1e-4f); + BOOST_CHECK_CLOSE(float(values_output[1]), 2.0f, 1e-4f); + BOOST_CHECK_CLOSE(float(values_output[2]), 5.0f, 1e-4f); + BOOST_CHECK_CLOSE(float(values_output[3]), 77.1f, 1e-4f); +} + +BOOST_AUTO_TEST_CASE(reduce_by_key_int2) +{ + using bc::int2_; + + int keys[] = { 0, 2, 3, 3, 3, 3, 4, 4 }; + int2_ data[] = { + int2_(0, 1), int2_(-3, 2), int2_(0, 1), int2_(0, 1), + int2_(-3, 0), int2_(0, 0), int2_(-3, 2), int2_(-7, -2) + }; + + bc::vector keys_input(keys, keys + 8, queue); + bc::vector values_input(data, data + 8, queue); + + bc::vector keys_output(8, context); + bc::vector values_output(8, context); + + bc::reduce_by_key(keys_input.begin(), keys_input.end(), values_input.begin(), + keys_output.begin(), values_output.begin(), queue); + + CHECK_RANGE_EQUAL(int, 4, keys_output, (0, 2, 3, 4)); + CHECK_RANGE_EQUAL(int2_, 4, values_output, + (int2_(0, 1), int2_(-3, 2), int2_(-3, 2), int2_(-10, 0))); +} + +BOOST_AUTO_TEST_CASE(reduce_by_key_int2_long_vector) +{ + using bc::int2_; + + size_t size = 1024; + bc::vector keys_input(size, int(0), queue); + bc::vector values_input(size, int2_(1, -1), queue); + + bc::vector keys_output(size, context); + bc::vector values_output(size, context); + + bc::reduce_by_key(keys_input.begin(), keys_input.end(), values_input.begin(), + keys_output.begin(), values_output.begin(), queue); + + CHECK_RANGE_EQUAL(int, 1, keys_output, (0)); + CHECK_RANGE_EQUAL(int2_, 1, values_output, (int2_(size, -size))); + + keys_input[137] = 1; + keys_input[677] = 1; + keys_input[1001] = 1; + bc::inclusive_scan(keys_input.begin(), keys_input.end(), keys_input.begin(), queue); + + bc::reduce_by_key(keys_input.begin(), keys_input.end(), values_input.begin(), + keys_output.begin(), values_output.begin(), queue); + + CHECK_RANGE_EQUAL(int, 4, keys_output, (0, 1, 2, 3)); + CHECK_RANGE_EQUAL(int2_, 4, values_output, + (int2_(137, -137), int2_(540, -540), int2_(324, -324), int2_(23, -23))); +} + +BOOST_AUTO_TEST_SUITE_END()