Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create consumers for Kafka tables on fly with TTL (resubmit) #58310

Merged
merged 15 commits into from
Dec 30, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 13 additions & 7 deletions src/Common/setThreadName.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,31 @@ namespace ErrorCodes
static thread_local char thread_name[THREAD_NAME_SIZE]{};


void setThreadName(const char * name)
void setThreadName(const char * name, bool truncate)
{
if (strlen(name) > THREAD_NAME_SIZE - 1)
size_t name_len = strlen(name);
if (!truncate && name_len > THREAD_NAME_SIZE - 1)
throw DB::Exception(DB::ErrorCodes::PTHREAD_ERROR, "Thread name cannot be longer than 15 bytes");

size_t name_capped_len = std::min<size_t>(1 + name_len, THREAD_NAME_SIZE - 1);
char name_capped[THREAD_NAME_SIZE];
memcpy(name_capped, name, name_capped_len);
name_capped[name_capped_len] = '\0';

#if defined(OS_FREEBSD)
pthread_set_name_np(pthread_self(), name);
pthread_set_name_np(pthread_self(), name_capped);
if ((false))
#elif defined(OS_DARWIN)
if (0 != pthread_setname_np(name))
if (0 != pthread_setname_np(name_capped))
#elif defined(OS_SUNOS)
if (0 != pthread_setname_np(pthread_self(), name))
if (0 != pthread_setname_np(pthread_self(), name_capped))
#else
if (0 != prctl(PR_SET_NAME, name, 0, 0, 0))
if (0 != prctl(PR_SET_NAME, name_capped, 0, 0, 0))
#endif
if (errno != ENOSYS && errno != EPERM) /// It's ok if the syscall is unsupported or not allowed in some environments.
throw DB::ErrnoException(DB::ErrorCodes::PTHREAD_ERROR, "Cannot set thread name with prctl(PR_SET_NAME, ...)");

memcpy(thread_name, name, std::min<size_t>(1 + strlen(name), THREAD_NAME_SIZE - 1));
memcpy(thread_name, name_capped, name_capped_len);
}

const char * getThreadName()
Expand Down
4 changes: 3 additions & 1 deletion src/Common/setThreadName.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
/** Sets the thread name (maximum length is 15 bytes),
* which will be visible in ps, gdb, /proc,
* for convenience of observation and debugging.
*
* @param truncate - if true, will truncate to 15 automatically, otherwise throw
*/
void setThreadName(const char * name);
void setThreadName(const char * name, bool truncate = false);

const char * getThreadName();
59 changes: 49 additions & 10 deletions src/Storages/Kafka/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <base/defines.h>

namespace CurrentMetrics
{
Expand Down Expand Up @@ -46,15 +47,13 @@ const auto DRAIN_TIMEOUT_MS = 5000ms;


KafkaConsumer::KafkaConsumer(
ConsumerPtr consumer_,
Poco::Logger * log_,
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
const std::atomic<bool> & stopped_,
const Names & _topics)
: consumer(consumer_)
, log(log_)
: log(log_)
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, intermediate_commit(intermediate_commit_)
Expand All @@ -63,6 +62,25 @@ KafkaConsumer::KafkaConsumer(
, topics(_topics)
, exceptions_buffer(EXCEPTIONS_DEPTH)
{
}

void KafkaConsumer::createConsumer(cppkafka::Configuration consumer_config)
{
chassert(!consumer.get());

/// Using this should be safe, since cppkafka::Consumer can poll messages
/// (including statistics, which will trigger the callback below) only via
/// KafkaConsumer.
if (consumer_config.get("statistics.interval.ms") != "0")
{
consumer_config.set_stats_callback([this](cppkafka::KafkaHandleBase &, const std::string & stat_json)
{
setRDKafkaStat(stat_json);
});
}
consumer = std::make_shared<cppkafka::Consumer>(consumer_config);
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);

// called (synchronously, during poll) when we enter the consumer group
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{
Expand Down Expand Up @@ -135,6 +153,9 @@ KafkaConsumer::KafkaConsumer(

KafkaConsumer::~KafkaConsumer()
{
if (!consumer)
return;

try
{
if (!consumer->get_subscription().empty())
Expand All @@ -155,6 +176,8 @@ KafkaConsumer::~KafkaConsumer()
LOG_ERROR(log, "Error while destructing consumer: {}", e.what());
}

/// Reset the consumer explicitly to avoid triggering statistics callback from the dtor
consumer.reset();
}

// Needed to drain rest of the messages / queued callback calls from the consumer
Expand Down Expand Up @@ -568,6 +591,9 @@ void KafkaConsumer::setExceptionInfo(const std::string & text, bool with_stacktr
*/
std::string KafkaConsumer::getMemberId() const
{
if (!consumer)
return "";

char * memberid_ptr = rd_kafka_memberid(consumer->get_handle());
std::string memberid_string = memberid_ptr;
rd_kafka_mem_free(nullptr, memberid_ptr);
Expand All @@ -578,8 +604,14 @@ std::string KafkaConsumer::getMemberId() const
KafkaConsumer::Stat KafkaConsumer::getStat() const
{
KafkaConsumer::Stat::Assignments assignments;
auto cpp_assignments = consumer->get_assignment();
auto cpp_offsets = consumer->get_offsets_position(cpp_assignments);
cppkafka::TopicPartitionList cpp_assignments;
cppkafka::TopicPartitionList cpp_offsets;

if (consumer)
{
cpp_assignments = consumer->get_assignment();
cpp_offsets = consumer->get_offsets_position(cpp_assignments);
}

for (size_t num = 0; num < cpp_assignments.size(); ++num)
{
Expand All @@ -591,7 +623,7 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const
}

return {
.consumer_id = getMemberId() /* consumer->get_member_id() */ ,
.consumer_id = getMemberId(),
.assignments = std::move(assignments),
.last_poll_time = last_poll_timestamp_usec.load(),
.num_messages_read = num_messages_read.load(),
Expand All @@ -601,11 +633,18 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const
.num_commits = num_commits.load(),
.num_rebalance_assignments = num_rebalance_assignments.load(),
.num_rebalance_revocations = num_rebalance_revocations.load(),
.exceptions_buffer = [&](){std::lock_guard<std::mutex> lock(exception_mutex);
return exceptions_buffer;}(),
.exceptions_buffer = [&]()
{
std::lock_guard<std::mutex> lock(exception_mutex);
return exceptions_buffer;
}(),
.in_use = in_use.load(),
.rdkafka_stat = [&](){std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
return rdkafka_stat;}(),
.last_used_usec = last_used_usec.load(),
.rdkafka_stat = [&]()
{
std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
return rdkafka_stat;
}(),
};
}

Expand Down
23 changes: 18 additions & 5 deletions src/Storages/Kafka/KafkaConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ class KafkaConsumer
UInt64 num_rebalance_revocations;
KafkaConsumer::ExceptionsBuffer exceptions_buffer;
bool in_use;
UInt64 last_used_usec;
std::string rdkafka_stat;
};

public:
KafkaConsumer(
ConsumerPtr consumer_,
Poco::Logger * log_,
size_t max_batch_size,
size_t poll_timeout_,
Expand All @@ -72,6 +71,11 @@ class KafkaConsumer
);

~KafkaConsumer();

void createConsumer(cppkafka::Configuration consumer_config);
bool hasConsumer() const { return consumer.get() != nullptr; }
ConsumerPtr && moveConsumer() { return std::move(consumer); }
azat marked this conversation as resolved.
Show resolved Hide resolved

void commit(); // Commit all processed messages.
void subscribe(); // Subscribe internal consumer to topics.
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
Expand Down Expand Up @@ -113,11 +117,20 @@ class KafkaConsumer
rdkafka_stat = stat_json_string;
}
void inUse() { in_use = true; }
void notInUse() { in_use = false; }
void notInUse()
{
in_use = false;
last_used_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}

// For system.kafka_consumers
Stat getStat() const;

bool isInUse() const { return in_use; }
UInt64 getLastUsedUsec() const { return last_used_usec; }

std::string getMemberId() const;

private:
using Messages = std::vector<cppkafka::Message>;
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
Expand Down Expand Up @@ -168,6 +181,8 @@ class KafkaConsumer
std::atomic<UInt64> num_rebalance_assignments = 0;
std::atomic<UInt64> num_rebalance_revocations = 0;
std::atomic<bool> in_use = 0;
/// Last used time (for TTL)
std::atomic<UInt64> last_used_usec = 0;

mutable std::mutex rdkafka_stat_mutex;
std::string rdkafka_stat;
Expand All @@ -178,8 +193,6 @@ class KafkaConsumer
/// Return number of messages with an error.
size_t filterMessageErrors();
ReadBufferPtr getNextMessage();

std::string getMemberId() const;
};

}
12 changes: 12 additions & 0 deletions src/Storages/Kafka/KafkaSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
extern const int BAD_ARGUMENTS;
}

IMPLEMENT_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS)
Expand Down Expand Up @@ -38,4 +39,15 @@ void KafkaSettings::loadFromQuery(ASTStorage & storage_def)
}
}

void KafkaSettings::sanityCheck() const
{
if (kafka_consumers_pool_ttl_ms < KAFKA_RESCHEDULE_MS)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of 'kafka_consumers_pool_ttl_ms' ({}) cannot be less then rescheduled interval ({})",
kafka_consumers_pool_ttl_ms, KAFKA_RESCHEDULE_MS);

if (kafka_consumers_pool_ttl_ms > KAFKA_CONSUMERS_POOL_TTL_MS_MAX)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of 'kafka_consumers_pool_ttl_ms' ({}) cannot be too big (greater then {}), since this may cause live memory leaks",
kafka_consumers_pool_ttl_ms, KAFKA_CONSUMERS_POOL_TTL_MS_MAX);
}

}
9 changes: 9 additions & 0 deletions src/Storages/Kafka/KafkaSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ namespace DB
{
class ASTStorage;

const auto KAFKA_RESCHEDULE_MS = 500;
const auto KAFKA_CLEANUP_TIMEOUT_MS = 3000;
// once per minute leave do reschedule (we can't lock threads in pool forever)
const auto KAFKA_MAX_THREAD_WORK_DURATION_MS = 60000;
// 10min
const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000;

#define KAFKA_RELATED_SETTINGS(M, ALIAS) \
M(String, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.", 0) \
Expand All @@ -25,6 +31,7 @@ class ASTStorage;
/* default is stream_poll_timeout_ms */ \
M(Milliseconds, kafka_poll_timeout_ms, 0, "Timeout for single poll from Kafka.", 0) \
M(UInt64, kafka_poll_max_batch_size, 0, "Maximum amount of messages to be polled in a single Kafka poll.", 0) \
M(UInt64, kafka_consumers_pool_ttl_ms, 60'000, "TTL for Kafka consumers (in milliseconds)", 0) \
/* default is stream_flush_interval_ms */ \
M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
Expand Down Expand Up @@ -53,6 +60,8 @@ DECLARE_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS)
struct KafkaSettings : public BaseSettings<KafkaSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);

void sanityCheck() const;
};

}