Skip to content

Commit

Permalink
Merge pull request #58310 from azat/kafka-fix-stat-leak-resubmit
Browse files Browse the repository at this point in the history
Create consumers for Kafka tables on fly with TTL (resubmit)
  • Loading branch information
alexey-milovidov committed Dec 30, 2023
2 parents e1812f3 + 853fdfe commit 8fc05e2
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 194 deletions.
20 changes: 13 additions & 7 deletions src/Common/setThreadName.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,31 @@ namespace ErrorCodes
static thread_local char thread_name[THREAD_NAME_SIZE]{};


void setThreadName(const char * name)
void setThreadName(const char * name, bool truncate)
{
if (strlen(name) > THREAD_NAME_SIZE - 1)
size_t name_len = strlen(name);
if (!truncate && name_len > THREAD_NAME_SIZE - 1)
throw DB::Exception(DB::ErrorCodes::PTHREAD_ERROR, "Thread name cannot be longer than 15 bytes");

size_t name_capped_len = std::min<size_t>(1 + name_len, THREAD_NAME_SIZE - 1);
char name_capped[THREAD_NAME_SIZE];
memcpy(name_capped, name, name_capped_len);
name_capped[name_capped_len] = '\0';

#if defined(OS_FREEBSD)
pthread_set_name_np(pthread_self(), name);
pthread_set_name_np(pthread_self(), name_capped);
if ((false))
#elif defined(OS_DARWIN)
if (0 != pthread_setname_np(name))
if (0 != pthread_setname_np(name_capped))
#elif defined(OS_SUNOS)
if (0 != pthread_setname_np(pthread_self(), name))
if (0 != pthread_setname_np(pthread_self(), name_capped))
#else
if (0 != prctl(PR_SET_NAME, name, 0, 0, 0))
if (0 != prctl(PR_SET_NAME, name_capped, 0, 0, 0))
#endif
if (errno != ENOSYS && errno != EPERM) /// It's ok if the syscall is unsupported or not allowed in some environments.
throw DB::ErrnoException(DB::ErrorCodes::PTHREAD_ERROR, "Cannot set thread name with prctl(PR_SET_NAME, ...)");

memcpy(thread_name, name, std::min<size_t>(1 + strlen(name), THREAD_NAME_SIZE - 1));
memcpy(thread_name, name_capped, name_capped_len);
}

const char * getThreadName()
Expand Down
4 changes: 3 additions & 1 deletion src/Common/setThreadName.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
/** Sets the thread name (maximum length is 15 bytes),
* which will be visible in ps, gdb, /proc,
* for convenience of observation and debugging.
*
* @param truncate - if true, will truncate to 15 automatically, otherwise throw
*/
void setThreadName(const char * name);
void setThreadName(const char * name, bool truncate = false);

const char * getThreadName();
77 changes: 66 additions & 11 deletions src/Storages/Kafka/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <base/defines.h>

namespace CurrentMetrics
{
Expand Down Expand Up @@ -46,15 +47,13 @@ const auto DRAIN_TIMEOUT_MS = 5000ms;


KafkaConsumer::KafkaConsumer(
ConsumerPtr consumer_,
Poco::Logger * log_,
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
const std::atomic<bool> & stopped_,
const Names & _topics)
: consumer(consumer_)
, log(log_)
: log(log_)
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, intermediate_commit(intermediate_commit_)
Expand All @@ -63,6 +62,25 @@ KafkaConsumer::KafkaConsumer(
, topics(_topics)
, exceptions_buffer(EXCEPTIONS_DEPTH)
{
}

void KafkaConsumer::createConsumer(cppkafka::Configuration consumer_config)
{
chassert(!consumer.get());

/// Using this should be safe, since cppkafka::Consumer can poll messages
/// (including statistics, which will trigger the callback below) only via
/// KafkaConsumer.
if (consumer_config.get("statistics.interval.ms") != "0")
{
consumer_config.set_stats_callback([this](cppkafka::KafkaHandleBase &, const std::string & stat_json)
{
setRDKafkaStat(stat_json);
});
}
consumer = std::make_shared<cppkafka::Consumer>(consumer_config);
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);

// called (synchronously, during poll) when we enter the consumer group
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{
Expand Down Expand Up @@ -133,8 +151,30 @@ KafkaConsumer::KafkaConsumer(
});
}

ConsumerPtr && KafkaConsumer::moveConsumer()
{
cleanUnprocessed();
if (!consumer->get_subscription().empty())
{
try
{
consumer->unsubscribe();
}
catch (const cppkafka::HandleException & e)
{
LOG_ERROR(log, "Error during unsubscribe: {}", e.what());
}
drain();
}
return std::move(consumer);
}

KafkaConsumer::~KafkaConsumer()
{
if (!consumer)
return;

cleanUnprocessed();
try
{
if (!consumer->get_subscription().empty())
Expand All @@ -154,7 +194,6 @@ KafkaConsumer::~KafkaConsumer()
{
LOG_ERROR(log, "Error while destructing consumer: {}", e.what());
}

}

// Needed to drain rest of the messages / queued callback calls from the consumer
Expand Down Expand Up @@ -568,6 +607,9 @@ void KafkaConsumer::setExceptionInfo(const std::string & text, bool with_stacktr
*/
std::string KafkaConsumer::getMemberId() const
{
if (!consumer)
return "";

char * memberid_ptr = rd_kafka_memberid(consumer->get_handle());
std::string memberid_string = memberid_ptr;
rd_kafka_mem_free(nullptr, memberid_ptr);
Expand All @@ -578,8 +620,14 @@ std::string KafkaConsumer::getMemberId() const
KafkaConsumer::Stat KafkaConsumer::getStat() const
{
KafkaConsumer::Stat::Assignments assignments;
auto cpp_assignments = consumer->get_assignment();
auto cpp_offsets = consumer->get_offsets_position(cpp_assignments);
cppkafka::TopicPartitionList cpp_assignments;
cppkafka::TopicPartitionList cpp_offsets;

if (consumer)
{
cpp_assignments = consumer->get_assignment();
cpp_offsets = consumer->get_offsets_position(cpp_assignments);
}

for (size_t num = 0; num < cpp_assignments.size(); ++num)
{
Expand All @@ -591,7 +639,7 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const
}

return {
.consumer_id = getMemberId() /* consumer->get_member_id() */ ,
.consumer_id = getMemberId(),
.assignments = std::move(assignments),
.last_poll_time = last_poll_timestamp_usec.load(),
.num_messages_read = num_messages_read.load(),
Expand All @@ -601,11 +649,18 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const
.num_commits = num_commits.load(),
.num_rebalance_assignments = num_rebalance_assignments.load(),
.num_rebalance_revocations = num_rebalance_revocations.load(),
.exceptions_buffer = [&](){std::lock_guard<std::mutex> lock(exception_mutex);
return exceptions_buffer;}(),
.exceptions_buffer = [&]()
{
std::lock_guard<std::mutex> lock(exception_mutex);
return exceptions_buffer;
}(),
.in_use = in_use.load(),
.rdkafka_stat = [&](){std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
return rdkafka_stat;}(),
.last_used_usec = last_used_usec.load(),
.rdkafka_stat = [&]()
{
std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
return rdkafka_stat;
}(),
};
}

Expand Down
34 changes: 24 additions & 10 deletions src/Storages/Kafka/KafkaConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ class KafkaConsumer
UInt64 num_rebalance_revocations;
KafkaConsumer::ExceptionsBuffer exceptions_buffer;
bool in_use;
UInt64 last_used_usec;
std::string rdkafka_stat;
};

public:
KafkaConsumer(
ConsumerPtr consumer_,
Poco::Logger * log_,
size_t max_batch_size,
size_t poll_timeout_,
Expand All @@ -72,6 +71,11 @@ class KafkaConsumer
);

~KafkaConsumer();

void createConsumer(cppkafka::Configuration consumer_config);
bool hasConsumer() const { return consumer.get() != nullptr; }
ConsumerPtr && moveConsumer();

void commit(); // Commit all processed messages.
void subscribe(); // Subscribe internal consumer to topics.
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
Expand Down Expand Up @@ -113,11 +117,20 @@ class KafkaConsumer
rdkafka_stat = stat_json_string;
}
void inUse() { in_use = true; }
void notInUse() { in_use = false; }
void notInUse()
{
in_use = false;
last_used_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}

// For system.kafka_consumers
Stat getStat() const;

bool isInUse() const { return in_use; }
UInt64 getLastUsedUsec() const { return last_used_usec; }

std::string getMemberId() const;

private:
using Messages = std::vector<cppkafka::Message>;
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
Expand All @@ -132,6 +145,10 @@ class KafkaConsumer
ERRORS_RETURNED
};

// order is important, need to be destructed *after* consumer
mutable std::mutex rdkafka_stat_mutex;
std::string rdkafka_stat;

ConsumerPtr consumer;
Poco::Logger * log;
const size_t batch_size = 1;
Expand All @@ -145,11 +162,11 @@ class KafkaConsumer

const std::atomic<bool> & stopped;

// order is important, need to be destructed before consumer
// order is important, need to be destructed *before* consumer
Messages messages;
Messages::const_iterator current;

// order is important, need to be destructed before consumer
// order is important, need to be destructed *before* consumer
std::optional<cppkafka::TopicPartitionList> assignment;
const Names topics;

Expand All @@ -168,18 +185,15 @@ class KafkaConsumer
std::atomic<UInt64> num_rebalance_assignments = 0;
std::atomic<UInt64> num_rebalance_revocations = 0;
std::atomic<bool> in_use = 0;

mutable std::mutex rdkafka_stat_mutex;
std::string rdkafka_stat;
/// Last used time (for TTL)
std::atomic<UInt64> last_used_usec = 0;

void drain();
void cleanUnprocessed();
void resetIfStopped();
/// Return number of messages with an error.
size_t filterMessageErrors();
ReadBufferPtr getNextMessage();

std::string getMemberId() const;
};

}
12 changes: 12 additions & 0 deletions src/Storages/Kafka/KafkaSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
extern const int BAD_ARGUMENTS;
}

IMPLEMENT_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS)
Expand Down Expand Up @@ -38,4 +39,15 @@ void KafkaSettings::loadFromQuery(ASTStorage & storage_def)
}
}

void KafkaSettings::sanityCheck() const
{
if (kafka_consumers_pool_ttl_ms < KAFKA_RESCHEDULE_MS)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of 'kafka_consumers_pool_ttl_ms' ({}) cannot be less then rescheduled interval ({})",
kafka_consumers_pool_ttl_ms, KAFKA_RESCHEDULE_MS);

if (kafka_consumers_pool_ttl_ms > KAFKA_CONSUMERS_POOL_TTL_MS_MAX)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of 'kafka_consumers_pool_ttl_ms' ({}) cannot be too big (greater then {}), since this may cause live memory leaks",
kafka_consumers_pool_ttl_ms, KAFKA_CONSUMERS_POOL_TTL_MS_MAX);
}

}
9 changes: 9 additions & 0 deletions src/Storages/Kafka/KafkaSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ namespace DB
{
class ASTStorage;

const auto KAFKA_RESCHEDULE_MS = 500;
const auto KAFKA_CLEANUP_TIMEOUT_MS = 3000;
// once per minute leave do reschedule (we can't lock threads in pool forever)
const auto KAFKA_MAX_THREAD_WORK_DURATION_MS = 60000;
// 10min
const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000;

#define KAFKA_RELATED_SETTINGS(M, ALIAS) \
M(String, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.", 0) \
Expand All @@ -25,6 +31,7 @@ class ASTStorage;
/* default is stream_poll_timeout_ms */ \
M(Milliseconds, kafka_poll_timeout_ms, 0, "Timeout for single poll from Kafka.", 0) \
M(UInt64, kafka_poll_max_batch_size, 0, "Maximum amount of messages to be polled in a single Kafka poll.", 0) \
M(UInt64, kafka_consumers_pool_ttl_ms, 60'000, "TTL for Kafka consumers (in milliseconds)", 0) \
/* default is stream_flush_interval_ms */ \
M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
Expand Down Expand Up @@ -53,6 +60,8 @@ DECLARE_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS)
struct KafkaSettings : public BaseSettings<KafkaSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);

void sanityCheck() const;
};

}

0 comments on commit 8fc05e2

Please sign in to comment.