Skip to content

Commit

Permalink
Merge pull request #58819 from ClickHouse/backport/23.8/58665
Browse files Browse the repository at this point in the history
Backport #58665 to 23.8: Add SYSTEM commands and Keeper 4LW for jemalloc
  • Loading branch information
alexey-milovidov committed Jan 15, 2024
2 parents 4fd6952 + 548b266 commit 92166c0
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 21 deletions.
1 change: 1 addition & 0 deletions programs/keeper/CMakeLists.txt
Expand Up @@ -94,6 +94,7 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/CurrentThread.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/NamedCollections/NamedCollections.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/NamedCollections/NamedCollectionConfiguration.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/Jemalloc.cpp

${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/IKeeper.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/TestKeeper.cpp
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Expand Up @@ -190,6 +190,7 @@ enum class AccessType
M(SYSTEM_UNFREEZE, "SYSTEM UNFREEZE", GLOBAL, SYSTEM) \
M(SYSTEM_FAILPOINT, "SYSTEM ENABLE FAILPOINT, SYSTEM DISABLE FAILPOINT", GLOBAL, SYSTEM) \
M(SYSTEM_LISTEN, "SYSTEM START LISTEN, SYSTEM STOP LISTEN", GLOBAL, SYSTEM) \
M(SYSTEM_JEMALLOC, "SYSTEM JEMALLOC PURGE, SYSTEM JEMALLOC ENABLE PROFILE, SYSTEM JEMALLOC DISABLE PROFILE, SYSTEM JEMALLOC FLUSH PROFILE", GLOBAL, SYSTEM) \
M(SYSTEM, "", GROUP, ALL) /* allows to execute SYSTEM {SHUTDOWN|RELOAD CONFIG|...} */ \
\
M(dictGet, "dictHas, dictGetHierarchy, dictIsIn", DICTIONARY, ALL) /* allows to execute functions dictGet(), dictHas(), dictGetHierarchy(), dictIsIn() */\
Expand Down
88 changes: 88 additions & 0 deletions src/Common/Jemalloc.cpp
@@ -0,0 +1,88 @@
#include <Common/Jemalloc.h>

#if USE_JEMALLOC

#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <jemalloc/jemalloc.h>

#define STRINGIFY_HELPER(x) #x
#define STRINGIFY(x) STRINGIFY_HELPER(x)

namespace ProfileEvents
{
extern const Event MemoryAllocatorPurge;
extern const Event MemoryAllocatorPurgeTimeMicroseconds;
}

namespace DB
{

namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}

void purgeJemallocArenas()
{
LOG_TRACE(&Poco::Logger::get("SystemJemalloc"), "Purging unused memory");
Stopwatch watch;
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, watch.elapsedMicroseconds());
}

void checkJemallocProfilingEnabled()
{
bool active = true;
size_t active_size = sizeof(active);
mallctl("opt.prof", &active, &active_size, nullptr, 0);

if (!active)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"ClickHouse was started without enabling profiling for jemalloc. To use jemalloc's profiler, following env variable should be "
"set: MALLOC_CONF=background_thread:true,prof:true");
}

void setJemallocProfileActive(bool value)
{
checkJemallocProfilingEnabled();
bool active = true;
size_t active_size = sizeof(active);
mallctl("prof.active", &active, &active_size, nullptr, 0);
if (active == value)
{
LOG_TRACE(&Poco::Logger::get("SystemJemalloc"), "Profiling is already {}", active ? "enabled" : "disabled");
return;
}

mallctl("prof.active", nullptr, nullptr, &value, sizeof(bool));
LOG_TRACE(&Poco::Logger::get("SystemJemalloc"), "Profiling is {}", value ? "enabled" : "disabled");
}

std::string flushJemallocProfile(const std::string & file_prefix)
{
checkJemallocProfilingEnabled();
char * prefix_buffer;
size_t prefix_size = sizeof(prefix_buffer);
int n = mallctl("opt.prof_prefix", &prefix_buffer, &prefix_size, nullptr, 0);
if (!n && std::string_view(prefix_buffer) != "jeprof")
{
LOG_TRACE(&Poco::Logger::get("SystemJemalloc"), "Flushing memory profile with prefix {}", prefix_buffer);
mallctl("prof.dump", nullptr, nullptr, nullptr, 0);
return prefix_buffer;
}

static std::atomic<size_t> profile_counter{0};
std::string profile_dump_path = fmt::format("{}.{}.{}.heap", file_prefix, getpid(), profile_counter.fetch_add(1));
const auto * profile_dump_path_str = profile_dump_path.c_str();

LOG_TRACE(&Poco::Logger::get("SystemJemalloc"), "Flushing memory profile to {}", profile_dump_path_str);
mallctl("prof.dump", nullptr, nullptr, &profile_dump_path_str, sizeof(profile_dump_path_str));
return profile_dump_path;
}

}

#endif
22 changes: 22 additions & 0 deletions src/Common/Jemalloc.h
@@ -0,0 +1,22 @@
#pragma once

#include "config.h"

#if USE_JEMALLOC

#include <string>

namespace DB
{

void purgeJemallocArenas();

void checkJemallocProfilingEnabled();

void setJemallocProfileActive(bool value);

std::string flushJemallocProfile(const std::string & file_prefix);

}

#endif
9 changes: 7 additions & 2 deletions src/Coordination/CoordinationSettings.cpp
@@ -1,10 +1,11 @@
#include <Coordination/CoordinationSettings.h>
#include <Common/logger_useful.h>
#include <filesystem>
#include <Coordination/Defines.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteIntText.h>

#include "config.h"

namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -36,7 +37,11 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
}


const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl";
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD =
#if USE_JEMALLOC
"jmst,jmfp,jmep,jmdp,"
#endif
"conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl";

KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
: server_id(NOT_EXIST)
Expand Down
52 changes: 52 additions & 0 deletions src/Coordination/FourLetterCommand.cpp
Expand Up @@ -18,6 +18,11 @@
#include <unistd.h>
#include <bit>

#if USE_JEMALLOC
#include <Common/Jemalloc.h>
#include <jemalloc/jemalloc.h>
#endif

namespace
{

Expand Down Expand Up @@ -172,6 +177,20 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
FourLetterCommandPtr feature_flags_command = std::make_shared<FeatureFlagsCommand>(keeper_dispatcher);
factory.registerCommand(feature_flags_command);

#if USE_JEMALLOC
FourLetterCommandPtr jemalloc_dump_stats = std::make_shared<JemallocDumpStats>(keeper_dispatcher);
factory.registerCommand(jemalloc_dump_stats);

FourLetterCommandPtr jemalloc_flush_profile = std::make_shared<JemallocFlushProfile>(keeper_dispatcher);
factory.registerCommand(jemalloc_flush_profile);

FourLetterCommandPtr jemalloc_enable_profile = std::make_shared<JemallocEnableProfile>(keeper_dispatcher);
factory.registerCommand(jemalloc_enable_profile);

FourLetterCommandPtr jemalloc_disable_profile = std::make_shared<JemallocDisableProfile>(keeper_dispatcher);
factory.registerCommand(jemalloc_disable_profile);
#endif

factory.initializeAllowList(keeper_dispatcher);
factory.setInitialize(true);
}
Expand Down Expand Up @@ -579,4 +598,37 @@ String FeatureFlagsCommand::run()
return ret.str();
}

#if USE_JEMALLOC

void printToString(void * output, const char * data)
{
std::string * output_data = reinterpret_cast<std::string *>(output);
*output_data += std::string(data);
}

String JemallocDumpStats::run()
{
std::string output;
malloc_stats_print(printToString, &output, nullptr);
return output;
}

String JemallocFlushProfile::run()
{
return flushJemallocProfile("/tmp/jemalloc_keeper");
}

String JemallocEnableProfile::run()
{
setJemallocProfileActive(true);
return "ok";
}

String JemallocDisableProfile::run()
{
setJemallocProfileActive(false);
return "ok";
}
#endif

}
51 changes: 51 additions & 0 deletions src/Coordination/FourLetterCommand.h
Expand Up @@ -415,4 +415,55 @@ struct FeatureFlagsCommand : public IFourLetterCommand
~FeatureFlagsCommand() override = default;
};

#if USE_JEMALLOC
struct JemallocDumpStats : public IFourLetterCommand
{
explicit JemallocDumpStats(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}

String name() override { return "jmst"; }
String run() override;
~JemallocDumpStats() override = default;

};

struct JemallocFlushProfile : public IFourLetterCommand
{
explicit JemallocFlushProfile(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}

String name() override { return "jmfp"; }
String run() override;
~JemallocFlushProfile() override = default;
};

struct JemallocEnableProfile : public IFourLetterCommand
{
explicit JemallocEnableProfile(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}

String name() override { return "jmep"; }
String run() override;
~JemallocEnableProfile() override = default;
};

struct JemallocDisableProfile : public IFourLetterCommand
{
explicit JemallocDisableProfile(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}

String name() override { return "jmdp"; }
String run() override;
~JemallocDisableProfile() override = default;
};
#endif

}
20 changes: 2 additions & 18 deletions src/Coordination/KeeperDispatcher.cpp
Expand Up @@ -14,16 +14,10 @@

#include <future>
#include <chrono>
#include <filesystem>
#include <iterator>
#include <limits>

#if USE_JEMALLOC
# include <jemalloc/jemalloc.h>

#define STRINGIFY_HELPER(x) #x
#define STRINGIFY(x) STRINGIFY_HELPER(x)

#include <Common/Jemalloc.h>
#endif

namespace CurrentMetrics
Expand All @@ -32,12 +26,6 @@ namespace CurrentMetrics
extern const Metric KeeperOutstandingRequets;
}

namespace ProfileEvents
{
extern const Event MemoryAllocatorPurge;
extern const Event MemoryAllocatorPurgeTimeMicroseconds;
}

using namespace std::chrono_literals;

namespace DB
Expand Down Expand Up @@ -877,11 +865,7 @@ Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
void KeeperDispatcher::cleanResources()
{
#if USE_JEMALLOC
LOG_TRACE(&Poco::Logger::get("KeeperDispatcher"), "Purging unused memory");
Stopwatch watch;
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, watch.elapsedMicroseconds());
purgeJemallocArenas();
#endif
}

Expand Down
41 changes: 40 additions & 1 deletion src/Interpreters/InterpreterSystemQuery.cpp
Expand Up @@ -67,6 +67,10 @@
#include <IO/S3/Client.h>
#endif

#if USE_JEMALLOC
#include <Common/Jemalloc.h>
#endif

#include "config.h"

namespace CurrentMetrics
Expand All @@ -88,7 +92,6 @@ namespace ErrorCodes
extern const int TABLE_WAS_NOT_DROPPED;
}


namespace ActionLocks
{
extern const StorageActionBlockType PartsMerge;
Expand Down Expand Up @@ -661,6 +664,32 @@ BlockIO InterpreterSystemQuery::execute()
FailPointInjection::disableFailPoint(query.fail_point_name);
break;
}
#if USE_JEMALLOC
case Type::JEMALLOC_PURGE:
{
getContext()->checkAccess(AccessType::SYSTEM_JEMALLOC);
purgeJemallocArenas();
break;
}
case Type::JEMALLOC_ENABLE_PROFILE:
{
getContext()->checkAccess(AccessType::SYSTEM_JEMALLOC);
setJemallocProfileActive(true);
break;
}
case Type::JEMALLOC_DISABLE_PROFILE:
{
getContext()->checkAccess(AccessType::SYSTEM_JEMALLOC);
setJemallocProfileActive(false);
break;
}
case Type::JEMALLOC_FLUSH_PROFILE:
{
getContext()->checkAccess(AccessType::SYSTEM_JEMALLOC);
flushJemallocProfile("/tmp/jemalloc_clickhouse");
break;
}
#endif
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown type of SYSTEM query");
}
Expand Down Expand Up @@ -1261,6 +1290,16 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_LISTEN);
break;
}
#if USE_JEMALLOC
case Type::JEMALLOC_PURGE:
case Type::JEMALLOC_ENABLE_PROFILE:
case Type::JEMALLOC_DISABLE_PROFILE:
case Type::JEMALLOC_FLUSH_PROFILE:
{
required_access.emplace_back(AccessType::SYSTEM_JEMALLOC);
break;
}
#endif
case Type::STOP_THREAD_FUZZER:
case Type::START_THREAD_FUZZER:
case Type::ENABLE_FAILPOINT:
Expand Down

0 comments on commit 92166c0

Please sign in to comment.