diff --git a/DataMgr/FileMgr/FileMgr.cpp b/DataMgr/FileMgr/FileMgr.cpp index 82360456c5..56b7dfebe8 100644 --- a/DataMgr/FileMgr/FileMgr.cpp +++ b/DataMgr/FileMgr/FileMgr.cpp @@ -38,6 +38,7 @@ #include "Shared/File.h" #include "Shared/checked_alloc.h" #include "Shared/measure.h" +#include "Shared/thread_count.h" constexpr char LEGACY_EPOCH_FILENAME[] = "epoch"; constexpr char EPOCH_FILENAME[] = "epoch_metadata"; @@ -207,7 +208,8 @@ void FileMgr::init(const size_t num_reader_threads, const int32_t epochOverride) endItr; // default construction yields past-the-end int32_t maxFileId = -1; int32_t fileCount = 0; - int32_t threadCount = std::thread::hardware_concurrency(); + int32_t threadCount = cpu_threads(); + std::vector headerVec; std::vector>> file_futures; boost::filesystem::path path(fileMgrBasePath_); @@ -302,7 +304,7 @@ void FileMgr::init(const size_t num_reader_threads, const int32_t epochOverride) /* define number of reader threads to be used */ size_t num_hardware_based_threads = - std::thread::hardware_concurrency(); // # of threads is based on # of cores on the + cpu_threads(); // # of threads is based on # of cores on the // host if (num_reader_threads == 0) { // # of threads has not been defined by user num_reader_threads_ = num_hardware_based_threads; @@ -420,7 +422,7 @@ void FileMgr::init(const std::string& dataPathToConvertFrom, endItr; // default construction yields past-the-end int32_t maxFileId = -1; int32_t fileCount = 0; - int32_t threadCount = std::thread::hardware_concurrency(); + int32_t threadCount = cpu_threads(); std::vector headerVec; std::vector>> file_futures; for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) { diff --git a/DataMgr/ForeignStorage/CsvDataWrapper.cpp b/DataMgr/ForeignStorage/CsvDataWrapper.cpp index fa7adc4345..d351d103c3 100644 --- a/DataMgr/ForeignStorage/CsvDataWrapper.cpp +++ b/DataMgr/ForeignStorage/CsvDataWrapper.cpp @@ -31,6 +31,7 @@ #include "ImportExport/DelimitedParserUtils.h" #include "ImportExport/Importer.h" #include "Shared/sqltypes.h" +#include "Shared/thread_count.h" #include "Utils/DdlUtils.h" namespace foreign_storage { @@ -357,7 +358,7 @@ size_t get_thread_count(const import_export::CopyParams& copy_params, const size_t buffer_size) { size_t thread_count = copy_params.threads; if (thread_count == 0) { - thread_count = std::thread::hardware_concurrency(); + thread_count = cpu_threads(); } if (size_known) { size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size; @@ -374,7 +375,7 @@ size_t get_thread_count(const import_export::CopyParams& copy_params, size_t thread_count = copy_params.threads; if (thread_count == 0) { thread_count = - std::min(std::thread::hardware_concurrency(), file_regions.size()); + std::min(cpu_threads(), file_regions.size()); } CHECK(thread_count); return thread_count; diff --git a/Parser/ParserNode.cpp b/Parser/ParserNode.cpp index 99d9271cfc..b7ee3bd6ca 100644 --- a/Parser/ParserNode.cpp +++ b/Parser/ParserNode.cpp @@ -63,6 +63,7 @@ #include "Shared/StringTransform.h" #include "Shared/measure.h" #include "Shared/shard_key.h" +#include "Shared/thread_count.h" #include "TableArchiver/TableArchiver.h" #include "Utils/FsiUtils.h" @@ -2945,7 +2946,7 @@ void InsertIntoTableAsSelectStmt::populateData(QueryStateProxy query_state_proxy TargetValueConverterFactory factory; - const int num_worker_threads = std::thread::hardware_concurrency(); + const int num_worker_threads = cpu_threads(); std::vector thread_start_idx(num_worker_threads), thread_end_idx(num_worker_threads); diff --git a/Shared/Compressor.cpp b/Shared/Compressor.cpp index 0ee80fc5d0..221c2e6653 100644 --- a/Shared/Compressor.cpp +++ b/Shared/Compressor.cpp @@ -23,6 +23,7 @@ **/ #include "Shared/Compressor.h" +#include "Shared/thread_count.h" #include #include @@ -41,7 +42,7 @@ BloscCompressor::BloscCompressor() { // We use maximum number of threads here since with tests we found that compression // speed gets lear scalling with corresponding to the number of threads being used. - blosc_set_nthreads(std::thread::hardware_concurrency()); + blosc_set_nthreads(cpu_threads()); // We chosse faster compressor, accepting slightly lower compression ratio // https://lz4.github.io/lz4/ diff --git a/Shared/Datum.cpp b/Shared/Datum.cpp index 6f8ef7fa5b..f4403cea6e 100644 --- a/Shared/Datum.cpp +++ b/Shared/Datum.cpp @@ -69,7 +69,7 @@ std::string SQLTypeInfo::comp_name[kENCODING_LAST] = {"NONE", "FIXED", "RL", "DIFF", "DICT", "SPARSE", "COMPRESSED", "DAYS"}; int64_t parse_numeric(const std::string_view s, SQLTypeInfo& ti) { - assert(s.length() <= 20); + assert(s.length() <= 30); size_t dot = s.find_first_of('.', 0); std::string before_dot; std::string after_dot; diff --git a/Shared/thread_count.h b/Shared/thread_count.h index 52a93d1441..90e1254e0f 100644 --- a/Shared/thread_count.h +++ b/Shared/thread_count.h @@ -23,5 +23,5 @@ extern unsigned g_cpu_threads_override; inline int cpu_threads() { auto ov = g_cpu_threads_override; - return (ov <= 0) ? std::max(2 * std::thread::hardware_concurrency(), 1U) : ov; + return (ov <= 0) ? std::max(std::thread::hardware_concurrency(), 1U) : ov; } diff --git a/StringDictionary/StringDictionary.cpp b/StringDictionary/StringDictionary.cpp index 37c3d76fa2..763bbc3c06 100644 --- a/StringDictionary/StringDictionary.cpp +++ b/StringDictionary/StringDictionary.cpp @@ -15,6 +15,7 @@ */ #include "StringDictionary/StringDictionary.h" +#include "Shared/thread_count.h" #include #include @@ -168,7 +169,8 @@ StringDictionary::StringDictionary(const std::string& folder, mapd_lock_guard write_lock(rw_mutex_); uint32_t thread_inits = 0; - const auto thread_count = std::thread::hardware_concurrency(); + const auto thread_count = cpu_threads(); + const uint32_t items_per_thread = std::max( 2000, std::min(200000, (str_count / thread_count) + 1)); std::vector>>> @@ -1480,7 +1482,7 @@ void StringDictionary::populate_string_array_ids( } }; - const int num_worker_threads = std::thread::hardware_concurrency(); + const int num_worker_threads = cpu_threads(); if (source_array_ids.size() / num_worker_threads > 10) { std::vector> worker_threads; diff --git a/ThriftHandler/CommandLineOptions.cpp b/ThriftHandler/CommandLineOptions.cpp index cf1df28a0e..8d3bd7262c 100644 --- a/ThriftHandler/CommandLineOptions.cpp +++ b/ThriftHandler/CommandLineOptions.cpp @@ -25,6 +25,7 @@ #include "MapDRelease.h" #include "QueryEngine/GroupByAndAggregate.h" #include "Shared/Compressor.h" +#include "Shared/thread_count.h" #include "StringDictionary/StringDictionary.h" #include "Utils/DdlUtils.h" @@ -122,6 +123,10 @@ void CommandLineOptions::fillOptions() { "exit-after-warmup", po::value(&exit_after_warmup)->default_value(false)->implicit_value(true), "Exit after OmniSci warmup queries."); + help_desc.add_options()("max-num-threads", + po::value(&g_cpu_threads_override) + ->default_value(std::thread::hardware_concurrency()), + "Set maximum number of threads"); help_desc.add_options()("dynamic-watchdog-time-limit", po::value(&dynamic_watchdog_time_limit) ->default_value(dynamic_watchdog_time_limit) diff --git a/ThriftHandler/DBHandler.cpp b/ThriftHandler/DBHandler.cpp index 5b0b47cccf..122869a15f 100644 --- a/ThriftHandler/DBHandler.cpp +++ b/ThriftHandler/DBHandler.cpp @@ -67,6 +67,7 @@ #include "QueryEngine/ThriftSerializers.h" #include "Shared/ArrowUtil.h" #include "Shared/StringTransform.h" +#include "Shared/thread_count.h" #include "Shared/import_helpers.h" #include "Shared/mapd_shared_mutex.h" #include "Shared/measure.h" @@ -753,7 +754,7 @@ void DBHandler::get_hardware_info(TClusterHardwareInfo& _return, } // start hardware/OS dependent code - ret.num_cpu_hw = std::thread::hardware_concurrency(); + ret.num_cpu_hw = cpu_threads(); // ^ This might return diffrent results in case of hyper threading // end hardware/OS dependent code