Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control the number of threads globally in Omnisci #622

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions DataMgr/FileMgr/FileMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "Shared/File.h"
#include "Shared/checked_alloc.h"
#include "Shared/measure.h"
#include "Shared/thread_count.h"

constexpr char LEGACY_EPOCH_FILENAME[] = "epoch";
constexpr char EPOCH_FILENAME[] = "epoch_metadata";
Expand Down Expand Up @@ -207,7 +208,8 @@ void FileMgr::init(const size_t num_reader_threads, const int32_t epochOverride)
endItr; // default construction yields past-the-end
int32_t maxFileId = -1;
int32_t fileCount = 0;
int32_t threadCount = std::thread::hardware_concurrency();
int32_t threadCount = cpu_threads();

std::vector<HeaderInfo> headerVec;
std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
boost::filesystem::path path(fileMgrBasePath_);
Expand Down Expand Up @@ -302,7 +304,7 @@ void FileMgr::init(const size_t num_reader_threads, const int32_t epochOverride)

/* define number of reader threads to be used */
size_t num_hardware_based_threads =
std::thread::hardware_concurrency(); // # of threads is based on # of cores on the
cpu_threads(); // # of threads is based on # of cores on the
// host
if (num_reader_threads == 0) { // # of threads has not been defined by user
num_reader_threads_ = num_hardware_based_threads;
Expand Down Expand Up @@ -420,7 +422,7 @@ void FileMgr::init(const std::string& dataPathToConvertFrom,
endItr; // default construction yields past-the-end
int32_t maxFileId = -1;
int32_t fileCount = 0;
int32_t threadCount = std::thread::hardware_concurrency();
int32_t threadCount = cpu_threads();
std::vector<HeaderInfo> headerVec;
std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
Expand Down
5 changes: 3 additions & 2 deletions DataMgr/ForeignStorage/CsvDataWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "ImportExport/DelimitedParserUtils.h"
#include "ImportExport/Importer.h"
#include "Shared/sqltypes.h"
#include "Shared/thread_count.h"
#include "Utils/DdlUtils.h"

namespace foreign_storage {
Expand Down Expand Up @@ -357,7 +358,7 @@ size_t get_thread_count(const import_export::CopyParams& copy_params,
const size_t buffer_size) {
size_t thread_count = copy_params.threads;
if (thread_count == 0) {
thread_count = std::thread::hardware_concurrency();
thread_count = cpu_threads();
}
if (size_known) {
size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
Expand All @@ -374,7 +375,7 @@ size_t get_thread_count(const import_export::CopyParams& copy_params,
size_t thread_count = copy_params.threads;
if (thread_count == 0) {
thread_count =
std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
std::min<size_t>(cpu_threads(), file_regions.size());
}
CHECK(thread_count);
return thread_count;
Expand Down
3 changes: 2 additions & 1 deletion Parser/ParserNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
#include "Shared/StringTransform.h"
#include "Shared/measure.h"
#include "Shared/shard_key.h"
#include "Shared/thread_count.h"
#include "TableArchiver/TableArchiver.h"
#include "Utils/FsiUtils.h"

Expand Down Expand Up @@ -2945,7 +2946,7 @@ void InsertIntoTableAsSelectStmt::populateData(QueryStateProxy query_state_proxy

TargetValueConverterFactory factory;

const int num_worker_threads = std::thread::hardware_concurrency();
const int num_worker_threads = cpu_threads();

std::vector<size_t> thread_start_idx(num_worker_threads),
thread_end_idx(num_worker_threads);
Expand Down
3 changes: 2 additions & 1 deletion Shared/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
**/

#include "Shared/Compressor.h"
#include "Shared/thread_count.h"

#include <cstdint>
#include <memory>
Expand All @@ -41,7 +42,7 @@ BloscCompressor::BloscCompressor() {
// We use maximum number of threads here since with tests we found that compression
// speed gets lear scalling with corresponding to the number of threads being used.

blosc_set_nthreads(std::thread::hardware_concurrency());
blosc_set_nthreads(cpu_threads());

// We chosse faster compressor, accepting slightly lower compression ratio
// https://lz4.github.io/lz4/
Expand Down
2 changes: 1 addition & 1 deletion Shared/Datum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ std::string SQLTypeInfo::comp_name[kENCODING_LAST] =
{"NONE", "FIXED", "RL", "DIFF", "DICT", "SPARSE", "COMPRESSED", "DAYS"};

int64_t parse_numeric(const std::string_view s, SQLTypeInfo& ti) {
assert(s.length() <= 20);
assert(s.length() <= 30);
size_t dot = s.find_first_of('.', 0);
std::string before_dot;
std::string after_dot;
Expand Down
2 changes: 1 addition & 1 deletion Shared/thread_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ extern unsigned g_cpu_threads_override;

inline int cpu_threads() {
auto ov = g_cpu_threads_override;
return (ov <= 0) ? std::max(2 * std::thread::hardware_concurrency(), 1U) : ov;
return (ov <= 0) ? std::max(std::thread::hardware_concurrency(), 1U) : ov;
}
6 changes: 4 additions & 2 deletions StringDictionary/StringDictionary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "StringDictionary/StringDictionary.h"
#include "Shared/thread_count.h"

#include <tbb/parallel_for.h>
#include <boost/filesystem/operations.hpp>
Expand Down Expand Up @@ -168,7 +169,8 @@ StringDictionary::StringDictionary(const std::string& folder,
mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);

uint32_t thread_inits = 0;
const auto thread_count = std::thread::hardware_concurrency();
const auto thread_count = cpu_threads();

const uint32_t items_per_thread = std::max<uint32_t>(
2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
Expand Down Expand Up @@ -1480,7 +1482,7 @@ void StringDictionary::populate_string_array_ids(
}
};

const int num_worker_threads = std::thread::hardware_concurrency();
const int num_worker_threads = cpu_threads();

if (source_array_ids.size() / num_worker_threads > 10) {
std::vector<std::future<void>> worker_threads;
Expand Down
5 changes: 5 additions & 0 deletions ThriftHandler/CommandLineOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "MapDRelease.h"
#include "QueryEngine/GroupByAndAggregate.h"
#include "Shared/Compressor.h"
#include "Shared/thread_count.h"
#include "StringDictionary/StringDictionary.h"
#include "Utils/DdlUtils.h"

Expand Down Expand Up @@ -122,6 +123,10 @@ void CommandLineOptions::fillOptions() {
"exit-after-warmup",
po::value<bool>(&exit_after_warmup)->default_value(false)->implicit_value(true),
"Exit after OmniSci warmup queries.");
help_desc.add_options()("max-num-threads",
po::value<unsigned>(&g_cpu_threads_override)
->default_value(std::thread::hardware_concurrency()),
"Set maximum number of threads");
help_desc.add_options()("dynamic-watchdog-time-limit",
po::value<unsigned>(&dynamic_watchdog_time_limit)
->default_value(dynamic_watchdog_time_limit)
Expand Down
3 changes: 2 additions & 1 deletion ThriftHandler/DBHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
#include "QueryEngine/ThriftSerializers.h"
#include "Shared/ArrowUtil.h"
#include "Shared/StringTransform.h"
#include "Shared/thread_count.h"
#include "Shared/import_helpers.h"
#include "Shared/mapd_shared_mutex.h"
#include "Shared/measure.h"
Expand Down Expand Up @@ -753,7 +754,7 @@ void DBHandler::get_hardware_info(TClusterHardwareInfo& _return,
}

// start hardware/OS dependent code
ret.num_cpu_hw = std::thread::hardware_concurrency();
ret.num_cpu_hw = cpu_threads();
// ^ This might return diffrent results in case of hyper threading
// end hardware/OS dependent code

Expand Down