diff --git a/lib/kafka/message_buffer.rb b/lib/kafka/message_buffer.rb index 5e56ce19d..0e51f1674 100644 --- a/lib/kafka/message_buffer.rb +++ b/lib/kafka/message_buffer.rb @@ -48,6 +48,40 @@ def each end end + def clear_largest_message + target = nil + @buffer.each do |topic, messages_for_topic| + messages_for_topic.each do |partition, messages_for_partition| + messages_for_partition.each do |message| + if target.nil? || target[:message].bytesize.to_i < message.bytesize.to_i + target = { + topic: topic, + partition: partition, + message: message, + } + end + end + end + end + + clear_message(target) unless target.nil? + end + + def clear_message(topic:, partition:, message:) + return if topic.nil? || partition.nil? || message.nil? + return unless @buffer.key?(topic) && @buffer[topic].key?(partition) + return unless @buffer[topic][partition].include?(message) + + @size -= 1 + @bytesize -= message.bytesize + + @buffer[topic][partition].delete(message) + if @buffer[topic][partition].empty? + @buffer[topic].delete(partition) + @buffer.delete(topic) if @buffer[topic].empty? + end + end + # Clears buffered messages for the given topic and partition. # # @param topic [String] the name of the topic. diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index cdd479b7d..4ae4ce28f 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -383,7 +383,16 @@ def deliver_messages_with_retries(notification) end assign_partitions! - operation.execute + begin + operation.execute + rescue Kafka::MessageSizeTooLarge => e + @logger.error "Received unrecoverable error #{e.class}: #{e.message}" + @logger.info "Dropping largest message from buffer. Current buffer size: #{@buffer.size}, bytesize: #{@buffer.bytesize}" + @buffer.clear_largest_message + @logger.info "Dropped largest message from buffer. Current buffer size: #{@buffer.size}, bytesize: #{@buffer.bytesize}" + sleep @retry_backoff + retry + end if @required_acks.zero? # No response is returned by the brokers, so we can't know which messages