From 1c264d5b20875966a7888b42df9df4ba50fd490b Mon Sep 17 00:00:00 2001 From: Ryan Amos Date: Wed, 8 Jul 2020 13:50:09 -0700 Subject: [PATCH] Drop largest message from buffer on Kafka::MessageSizeTooLarge exception. We can get into a bad unrecoverable state when we get a Kafka::MessageSizeTooLarge exception. When we receive this exception, we remove the largest message from the buffer and simply retry. Once retried, if the buffer is still too large, we'll continue removing the largest message until we can successfully flush the buffer. --- lib/kafka/message_buffer.rb | 34 ++++++++++++++++++++++++++++++++++ lib/kafka/producer.rb | 11 ++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/lib/kafka/message_buffer.rb b/lib/kafka/message_buffer.rb index 5e56ce19d..0e51f1674 100644 --- a/lib/kafka/message_buffer.rb +++ b/lib/kafka/message_buffer.rb @@ -48,6 +48,40 @@ def each end end + def clear_largest_message + target = nil + @buffer.each do |topic, messages_for_topic| + messages_for_topic.each do |partition, messages_for_partition| + messages_for_partition.each do |message| + if target.nil? || target[:message].bytesize.to_i < message.bytesize.to_i + target = { + topic: topic, + partition: partition, + message: message, + } + end + end + end + end + + clear_message(target) unless target.nil? + end + + def clear_message(topic:, partition:, message:) + return if topic.nil? || partition.nil? || message.nil? + return unless @buffer.key?(topic) && @buffer[topic].key?(partition) + return unless @buffer[topic][partition].include?(message) + + @size -= 1 + @bytesize -= message.bytesize + + @buffer[topic][partition].delete(message) + if @buffer[topic][partition].empty? + @buffer[topic].delete(partition) + @buffer.delete(topic) if @buffer[topic].empty? + end + end + # Clears buffered messages for the given topic and partition. # # @param topic [String] the name of the topic. diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index cdd479b7d..4ae4ce28f 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -383,7 +383,16 @@ def deliver_messages_with_retries(notification) end assign_partitions! - operation.execute + begin + operation.execute + rescue Kafka::MessageSizeTooLarge => e + @logger.error "Received unrecoverable error #{e.class}: #{e.message}" + @logger.info "Dropping largest message from buffer. Current buffer size: #{@buffer.size}, bytesize: #{@buffer.bytesize}" + @buffer.clear_largest_message + @logger.info "Dropped largest message from buffer. Current buffer size: #{@buffer.size}, bytesize: #{@buffer.bytesize}" + sleep @retry_backoff + retry + end if @required_acks.zero? # No response is returned by the brokers, so we can't know which messages