diff --git a/.travis.yml b/.travis.yml index f9c953c..2b61e2b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ bundler_args: --without development rvm: - jruby env: - - JRUBY_OPTS="$JRUBY_OPTS -Xcli.debug=true --debug" + - JRUBY_OPTS="$JRUBY_OPTS -Xcli.debug=true --debug -J-Xms512m -J-Xmx512m" before_install: - password=`openssl rsautl -decrypt -inkey ~/.ssh/id_rsa -in config/travis/secret.rsaenc` - openssl aes-256-cbc -k "$password" -in config/travis/burtbot-travis-id_rsa.aes256enc -d -a -out id_private diff --git a/README.md b/README.md index 7c836a4..d790fbb 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,8 @@ track the progress of a reassignment. The `shuffle` subcommand can either perform a random reassignment of partitions, or it can use [rendezvous hashing](http://en.wikipedia.org/wiki/Rendezvous_hashing), which will minimize the number of partitions that has to move between replicas -when adding or removing brokers. +when adding or removing brokers. There is also rack aware rendezvous hashing, +which will ensure that data is properly replicated across racks. To start a random reassignment of the replica assignment of partitions matching `^test.*`, but leaving the rest alone: @@ -62,6 +63,10 @@ To do the same thing but using rendezvous hashing: ```shell $ ktl cluster shuffle '^test.*' -R -z localhost:2181/test ``` +To do the same thing using rack aware rendezvous hashing: +```shell +$ ktl cluster shuffle '^test.*' -a -z localhost:2181/test +``` #### Dealing with ZooKeeper's znode limit diff --git a/ktl.gemspec b/ktl.gemspec index 432d8e8..719522d 100644 --- a/ktl.gemspec +++ b/ktl.gemspec @@ -22,6 +22,6 @@ Gem::Specification.new do |s| s.platform = 'java' - s.add_runtime_dependency 'heller', '~> 0.2' + s.add_runtime_dependency 'kafka-jars', '= 0.10.0.1' s.add_runtime_dependency 'thor', '~> 0', '< 1.0' end diff --git a/lib/ext/kafka.rb b/lib/ext/kafka.rb index a445694..1a6ff8e 100644 --- a/lib/ext/kafka.rb +++ b/lib/ext/kafka.rb @@ -2,7 +2,6 @@ require 'kafka-jars' - module Log4j include_package 'org.apache.log4j' java_import 'org.apache.log4j.Logger' @@ -22,6 +21,7 @@ module Exception module Scala java_import 'scala.Console' java_import 'scala.Tuple2' + java_import 'scala.Option' class Tuple2 alias_method :first, :_1 @@ -68,15 +68,24 @@ def self.new_zk_client(zk_connect, timeout=30_000) def self.get_partitions_for_topic(zk, topic) topics = Scala::Collection::Immutable::List.from_array([topic].to_java) - partitions = ZkUtils.get_partitions_for_topics(zk, topics) + partitions = zk.get_partitions_for_topics(topics) partitions.get(topic).get end def self.delete_topic(zk, topic) - ZkUtils.create_persistent_path(zk, ZkUtils.get_delete_topic_path(topic), '') + acl = Kafka::Utils::ZkUtils::DefaultAcls(false) + zk.create_persistent_path(ZkUtils.get_delete_topic_path(topic), '', acl) end end + module Api + include_package 'kafka.api' + end + + module Cluster + include_package 'kafka.cluster' + end + module Admin include_package 'kafka.admin' @@ -101,9 +110,27 @@ def self.preferred_replica(zk_client, topics_partitions) end def self.assign_replicas_to_brokers(brokers, partitions, repl_factor, index=-1, partition=-1) - assignment = AdminUtils.assign_replicas_to_brokers(brokers, partitions, repl_factor, index, partition) + assignment = AdminUtils.assign_replicas_to_brokers(brokers, partitions.to_java(:int), repl_factor.to_java(:int), index.to_java(:int), partition.to_java(:int)) ScalaEnumerable.new(assignment) end + + def self.get_broker_metadatas(zk_client, brokers, force_rack = true) + rack_aware = if force_rack + JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Enforced$').get_declared_field('MODULE$').get(nil) + else + JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Safe$').get_declared_field('MODULE$').get(nil) + end + broker_metadatas = Kafka::Admin::AdminUtils.get_broker_metadatas( + zk_client.utils, + rack_aware, + Scala::Option[Scala::Collection::JavaConversions.as_scala_iterable(brokers).to_list] + ) + Scala::Collection::JavaConversions.seq_as_java_list(broker_metadatas).to_a + end + end + + module Protocol + java_import 'org.apache.kafka.common.protocol.SecurityProtocol' end module Common diff --git a/lib/ktl.rb b/lib/ktl.rb index 299b7ba..affffc3 100644 --- a/lib/ktl.rb +++ b/lib/ktl.rb @@ -1,8 +1,11 @@ # encoding: utf-8 +unless defined?($SLF4J_BACKEND) + $SLF4J_BACKEND = 'log4j12' +end + require 'thor' require 'json' -require 'heller' require 'logger' require 'ext/kafka' require 'ext/thor' @@ -36,9 +39,7 @@ def warn?; false end require 'ktl/command' require 'ktl/cluster' require 'ktl/cluster_stats_task' -require 'ktl/consumer' require 'ktl/decommission_plan' -require 'ktl/kafka_client' require 'ktl/migration_plan' require 'ktl/reassigner' require 'ktl/reassignment_progress' diff --git a/lib/ktl/cli.rb b/lib/ktl/cli.rb index 4ecf822..a3533e5 100644 --- a/lib/ktl/cli.rb +++ b/lib/ktl/cli.rb @@ -2,9 +2,6 @@ module Ktl class Cli < Thor - desc 'consumer SUBCOMMAND ...ARGS', 'Commands for managing consumers' - subcommand 'consumer', Consumer - desc 'cluster SUBCOMMAND ...ARGS', 'Commands for managing a cluster' subcommand 'cluster', Cluster diff --git a/lib/ktl/cluster.rb b/lib/ktl/cluster.rb index 35ede08..2e9c7e0 100644 --- a/lib/ktl/cluster.rb +++ b/lib/ktl/cluster.rb @@ -45,12 +45,19 @@ def migrate_broker option :brokers, type: :array, desc: 'Broker IDs' option :blacklist, type: :array, desc: 'Broker IDs to exclude' option :rendezvous, aliases: %w[-R], type: :boolean, desc: 'Whether to use Rendezvous-hashing based shuffle' + option :rack_aware, aliases: %w[-a], type: :boolean, desc: 'Whether to use Rack aware + Rendezvous-hashing based shuffle' option :replication_factor, aliases: %w[-r], type: :numeric, desc: 'Replication factor to use' option :limit, aliases: %w[-l], type: :numeric, desc: 'Max number of partitions to reassign at a time' option :zookeeper, aliases: %w[-z], required: true, desc: 'ZooKeeper URI' def shuffle(regexp='.*') with_zk_client do |zk_client| - plan_factory = options.rendezvous ? RendezvousShufflePlan : ShufflePlan + plan_factory = if options.rack_aware + RackAwareShufflePlan + elsif options.rendezvous + RendezvousShufflePlan + else + ShufflePlan + end plan = plan_factory.new(zk_client, { filter: Regexp.new(regexp), brokers: options.brokers, diff --git a/lib/ktl/cluster_stats_task.rb b/lib/ktl/cluster_stats_task.rb index 6f32427..26bffda 100644 --- a/lib/ktl/cluster_stats_task.rb +++ b/lib/ktl/cluster_stats_task.rb @@ -19,7 +19,7 @@ def execute brokers.foreach do |broker| leader_for = ownership[broker.id] share = leader_for.fdiv(partitions.size.to_f) * 100 - @shell.say ' - %d (%s) leader for %d partitions (%.2f %%)' % [broker.id, broker.host, leader_for, share] + @shell.say ' - %d leader for %d partitions (%.2f %%)' % [broker.id, leader_for, share] end end diff --git a/lib/ktl/command.rb b/lib/ktl/command.rb index 109e125..b53e87c 100644 --- a/lib/ktl/command.rb +++ b/lib/ktl/command.rb @@ -13,18 +13,12 @@ def with_zk_client rescue => e logger.error '%s (%s)' % [e.message, e.class.name] logger.debug e.backtrace.join($/) + $stderr.puts '%s (%s)' % [e.message, e.class.name] + $stderr.puts e.backtrace.join($/) ensure zk_client.close if zk_client end - def with_kafka_client(options={}) - brokers = with_zk_client { |zk_client| ScalaEnumerable.new(zk_client.brokers).map(&:connection_string) } - kafka_client = KafkaClient.create(options.merge(hosts: brokers)) - yield kafka_client - ensure - kafka_client.close if kafka_client - end - def logger @logger ||= Logger.new($stdout).tap do |log| log.formatter = ShellFormater.new(shell) diff --git a/lib/ktl/consumer.rb b/lib/ktl/consumer.rb deleted file mode 100644 index 3c8084f..0000000 --- a/lib/ktl/consumer.rb +++ /dev/null @@ -1,14 +0,0 @@ -# encoding: utf-8 - -module Ktl - class Consumer < Command - desc 'lag GROUP_NAME', 'Check lag of a consumer group' - option :topics, type: :array, aliases: %w[-t], default: [], desc: 'List of topics to include (or all if none given)' - option :zookeeper, aliases: %w[-z], required: true, desc: 'ZooKeeper URI' - def lag(group_name) - args = %W[--zookeeper #{options.zookeeper} --group #{group_name}] - args << '--topic' << options.topics.join(',') if options.topics.any? - Kafka::Tools::ConsumerOffsetChecker.main(args.to_java(:String)) - end - end -end diff --git a/lib/ktl/kafka_client.rb b/lib/ktl/kafka_client.rb deleted file mode 100644 index ba3950e..0000000 --- a/lib/ktl/kafka_client.rb +++ /dev/null @@ -1,118 +0,0 @@ -# encoding: utf-8 - -module Ktl - class KafkaClient - def self.create(config) - new(config).setup - end - - def initialize(config) - @hosts = config[:hosts] - @backoff = (config[:backoff] || 1).to_i * 1000 - @max_offsets = config[:max_offsets] || 10 - @logger = config[:logger] || NullLogger.new - @consumer_impl = config[:consumer_impl] || Heller::Consumer - @sleeper = config[:sleeper] || Java::JavaLang::Thread - @consumers = JavaConcurrent::ConcurrentHashMap.new - @mutex = Mutex.new - end - - def setup - @logger.info(sprintf('Connecting to Kafka at %s', @hosts.join(','))) - @hosts.each do |connection_string| - @consumers[connection_string] = @consumer_impl.new(connection_string) - end - self - end - - def partitions(filter=/.*/) - result = Hash.new { |h, k| h[k] = [] } - metadata = fetch_metadata - metadata.each do |topic, partition_metadata| - result[topic] << partition_metadata.partition_id if topic =~ filter - end - result - end - - def earliest_offset(topic_partitions) - fetch_offsets(topic_partitions, Heller::OffsetRequest.earliest_time) { |offsets| offsets.min } - end - - def latest_offset(topic_partitions) - fetch_offsets(topic_partitions, Heller::OffsetRequest.latest_time) { |offsets| offsets.max } - end - - def offset_before(topic_partitions, time) - fetch_offsets(topic_partitions, time.to_i * 1000) { |offsets| offsets.max } - end - - def close - @consumers.each_value(&:close) - end - - private - - def fetch_metadata - consumer = random_consumer - consumer.metadata - end - - def random_consumer - @consumers.values.sample - end - - def consumer_for(connection_string) - if (consumer = @consumers[connection_string]) - consumer - else - consumer = @consumer_impl.new(connection_string) - @consumers[connection_string] = consumer - consumer - end - end - - def build_offset_requests(topic_partitions, timestamp) - requests = Hash.new { |h, k| h[k] = [] } - metadata = fetch_metadata - topic_partitions.each do |topic, partitions| - partitions.each do |partition| - broker = metadata.leader_for(topic, partition) - consumer = consumer_for(broker.connection_string) - request = Heller::OffsetRequest.new(topic, partition, timestamp, @max_offsets) - requests[consumer] << request - end - end - requests - end - - def fetch_offsets(topic_partitions, timestamp, &extracter) - result = Hash.new { |h, k| h[k] = {} } - consumer_requests = build_offset_requests(topic_partitions, timestamp) - until consumer_requests.empty? do - failed = Hash.new { |h, k| h[k] = [] } - consumer_requests.each do |consumer, requests| - response = consumer.offsets_before(requests) - requests.each do |request| - topic, partition = request.topic, request.partition - if response.error(topic, partition) > 0 - failed[topic] << partition - else - offsets = response.offsets(topic, partition) - offsets = extracter.call(offsets) if block_given? - result[topic][partition] = offsets - end - end - end - break if failed.empty? - @logger.debug do - topics = failed.keys.size - partitions = failed.values.map(&:size).reduce(:+) - sprintf('Failed to find metadata for %d topics (%d partitions), retrying...', topics, partitions) - end - @sleeper.sleep(@backoff) - consumer_requests = build_offset_requests(failed, timestamp) - end - result - end - end -end diff --git a/lib/ktl/migration_plan.rb b/lib/ktl/migration_plan.rb index 625d7d5..4a678ee 100644 --- a/lib/ktl/migration_plan.rb +++ b/lib/ktl/migration_plan.rb @@ -4,8 +4,8 @@ module Ktl class MigrationPlan def initialize(zk_client, old_leader, new_leader) @zk_client = zk_client - @old_leader = old_leader - @new_leader = new_leader + @old_leader = old_leader.to_java + @new_leader = new_leader.to_java end def generate diff --git a/lib/ktl/reassigner.rb b/lib/ktl/reassigner.rb index c1c817a..1eb8a5e 100644 --- a/lib/ktl/reassigner.rb +++ b/lib/ktl/reassigner.rb @@ -89,11 +89,15 @@ def write_overflow(reassignments) end def reassignment_json(reassignment) - Kafka::Utils::ZkUtils.get_partition_reassignment_zk_data(reassignment) + zk_utils.format_as_reassignment_json(reassignment) end def parse_reassignment_json(json) - Kafka::Utils::ZkUtils.parse_partition_reassignment_data(json) + zk_utils.parse_partition_reassignment_data(json) + end + + def zk_utils + @zk_utils ||= Kafka::Utils::ZkUtils.new(nil, nil, false) end def maybe_split_by_limit(reassignment, limit=nil) diff --git a/lib/ktl/shuffle_plan.rb b/lib/ktl/shuffle_plan.rb index fbd6d53..f258153 100644 --- a/lib/ktl/shuffle_plan.rb +++ b/lib/ktl/shuffle_plan.rb @@ -42,8 +42,9 @@ def select_brokers end def assign_replicas_to_brokers(topic, brokers, partition_count, replica_count) - brokers = Scala::Collection::JavaConversions.as_scala_iterable(brokers.map { |x| x.to_java(:int) }).to_list - Kafka::Admin.assign_replicas_to_brokers(brokers, partition_count, replica_count) + broker_metadatas = brokers.map { |x| Kafka::Admin::BrokerMetadata.new(x.to_java(:int), Scala::Option[nil]) } + broker_metadatas = Scala::Collection::JavaConversions.as_scala_iterable(broker_metadatas).to_seq + Kafka::Admin.assign_replicas_to_brokers(broker_metadatas, partition_count, replica_count) rescue Kafka::Admin::AdminOperationException => e raise ArgumentError, sprintf('%s (%s)', e.message, e.class.name), e.backtrace end @@ -70,4 +71,62 @@ def assign_replicas_to_brokers(topic, brokers, partition_count, replica_count) SEED = 1683520333 end + + class RackAwareShufflePlan < RendezvousShufflePlan + def initialize(*args) + super + @rack_mappings = {} + end + + def assign_replicas_to_brokers(topic, brokers, partition_count, replica_count) + if replica_count > brokers.size + raise ArgumentError, sprintf('replication factor: %i larger than available brokers: %i', replica_count, brokers.size) + end + result = [] + racks = brokers.each_with_object({}) do |broker, acc| + rack = rack_for(broker) + acc[rack] ||= [] + acc[rack] << broker + end + partition_count.times do |partition| + first_sorted = racks.flat_map do |rack, rack_brokers| + hashed_brokers = rack_brokers.map do |broker| + key = [partition, topic, broker].pack('l e + if e.message.match '--disable-rack-aware' + raise "Not all brokers have rack information. Unable to create rack aware shuffle plan." + else + raise e + end + end + end end diff --git a/lib/ktl/topic.rb b/lib/ktl/topic.rb index 8972f8f..f4095a9 100644 --- a/lib/ktl/topic.rb +++ b/lib/ktl/topic.rb @@ -32,6 +32,7 @@ def describe(regexp=nil) option :partitions, aliases: %w[-p], default: '1', desc: 'Partitions for new topic(s)' option :replication_factor, aliases: %w[-r], default: '1', desc: 'Replication factor for new topic(s)' option :replica_assignment, aliases: %w[-a], desc: 'Manual replica assignment' + option :disable_rack_aware, desc: 'Disable rack awareness' option :config, aliases: %w[-c], desc: 'Key-value pairs of configuration options', type: :hash, default: {} option :zookeeper, aliases: %w[-z], required: true, desc: 'ZooKeeper URI' def create(*names) @@ -82,21 +83,6 @@ def delete(regexp) end end - desc 'reaper [REGEXP]', 'Delete empty topics (optionally matching regexp)' - option :zookeeper, aliases: %w[-z], required: true, desc: 'ZooKeeper URI' - option :parallel, aliases: %w[-p], desc: 'Number of topics to delete in parallel', type: :numeric, default: 10 - option :delay, aliases: %w[-d], desc: 'Delay between deletes', type: :numeric - def reaper(regexp='.*') - with_kafka_client do |kafka_client| - with_zk_client do |zk_client| - reaper_options = options.merge(logger: logger) - regexp = Regexp.new(regexp) - reaper = TopicReaper.new(kafka_client, zk_client, regexp, reaper_options) - reaper.execute - end - end - end - desc 'alter REGEXP', 'Alter topic configuration' option :add, aliases: %w[-a], desc: 'Key-value pairs of config options to add', type: :hash, default: {} option :remove, aliases: %w[-r], desc: 'Key-value pairs of config options to remove', type: :array, default: [] diff --git a/lib/ktl/version.rb b/lib/ktl/version.rb index 2a4f928..54182f7 100644 --- a/lib/ktl/version.rb +++ b/lib/ktl/version.rb @@ -1,5 +1,5 @@ # encoding: utf-8 module Ktl - VERSION = '1.0.0.pre0'.freeze + VERSION = '1.0.0'.freeze end diff --git a/lib/ktl/zookeeper_client.rb b/lib/ktl/zookeeper_client.rb index bf78ed4..19fc556 100644 --- a/lib/ktl/zookeeper_client.rb +++ b/lib/ktl/zookeeper_client.rb @@ -2,45 +2,46 @@ module Ktl class ZookeeperClient + attr_reader :utils + def initialize(uri, options={}) @uri = uri @threadpool = options[:threadpool] || JavaConcurrent::Executors.new_fixed_thread_pool(CONCURRENCY) - @utils = options[:utils] || Kafka::Utils::ZkUtils + @utils = options[:utils] || Kafka::Utils::ZkUtils.apply(@uri, 5000, 5000, false) end def setup - @client = Kafka::Utils.new_zk_client(@uri) @submit = @threadpool.java_method(:submit, [java.lang.Class.for_name('java.util.concurrent.Callable')]) self end def close @threadpool.shutdown_now if @threadpool - @client.close if @client + @utils.close end def raw_client - @client + @utils end def all_partitions - @utils.get_all_partitions(@client) + @utils.get_all_partitions end def all_topics - @utils.get_all_topics(@client) + @utils.get_all_topics end def brokers - @utils.get_all_brokers_in_cluster(@client) + @utils.get_all_brokers_in_cluster end def broker_ids - @utils.get_sorted_broker_list(@client) + @utils.get_sorted_broker_list end def leader_and_isr_for(partitions) - request(:get_partition_leader_and_isr_for_topics, partitions) + @utils.get_partition_leader_and_isr_for_topics(@utils.class.create_zk_client(@uri, 5_000, 5_000), partitions) end def partitions_for_topics(topics) @@ -52,46 +53,50 @@ def replica_assignment_for_topics(topics) end def partitions_being_reassigned - @utils.get_partitions_being_reassigned(@client) + @utils.get_partitions_being_reassigned end def reassign_partitions(json) - @utils.create_persistent_path(@client, @utils.reassign_partitions_path, json) + @utils.create_persistent_path(@utils.class.reassign_partitions_path, json, no_acl) end def create_znode(path, data='') - @utils.create_persistent_path(@client, path, data) + @utils.create_persistent_path(path, data, no_acl) end def delete_znode(path, options={}) if options[:recursive] - @utils.delete_path_recursive(@client, path) + @utils.delete_path_recursive(path) else - @utils.delete_path(@client, path) + @utils.delete_path(path) end end def read_data(path) - @utils.read_data(@client, path) + @utils.read_data(path) end def get_children(path) - @utils.get_children(@client, path) + @utils.get_children(path) end def exists?(path) - @utils.path_exists(@client, path) + @utils.path_exists(path) end private CONCURRENCY = 8 + def no_acl + Kafka::Utils::ZkUtils::DefaultAcls(false) + end + def request(method, input) chunk_size = [(input.size.to_f / CONCURRENCY).round, 1].max groups = ScalaEnumerable.new(input.grouped(chunk_size).to_seq) futures = groups.map do |slice| - @submit.call { @utils.send(method, @client, slice) } + @submit.call { @utils.send(method, slice) } end merge(futures.map(&:get)) end diff --git a/spec/integration/ktl_cluster_spec.rb b/spec/integration/ktl_cluster_spec.rb index 1c0e7b9..ce93212 100644 --- a/spec/integration/ktl_cluster_spec.rb +++ b/spec/integration/ktl_cluster_spec.rb @@ -12,7 +12,7 @@ end before do - register_broker(0, 'test-host-0') + register_broker(0) end describe 'stats' do @@ -26,7 +26,7 @@ expect(output).to include('Cluster status:') expect(output).to include('topics: 1 (2 partitions)') expect(output).to include('brokers: 1') - expect(output).to include('- 0 (test-host-0) leader for 2 partitions (100.00 %)') + expect(output).to include('- 0 leader for 2 partitions (100.00 %)') end end @@ -39,7 +39,7 @@ {topic: 'topic1', partition: 0, replicas: [1]} ] }.to_json - Kafka::Utils::ZkUtils.create_persistent_path(ktl_zk, '/ktl/overflow/0', overflow_json) + ktl_zk.create_persistent_path('/ktl/overflow/0', overflow_json, no_acl) end before do diff --git a/spec/integration/ktl_consumer_spec.rb b/spec/integration/ktl_consumer_spec.rb deleted file mode 100644 index 11341da..0000000 --- a/spec/integration/ktl_consumer_spec.rb +++ /dev/null @@ -1,92 +0,0 @@ -# encoding: utf-8 - -require 'spec_helper' - - -describe 'bin/ktl consumer' do - include_context 'integration setup' - - let :consumers_count do - 1 - end - - let :broker_port do - 9192 - end - - let :broker do - Kafka::Test.create_kafka_server({ - 'broker.id' => 0, - 'port' => broker_port, - 'zookeeper.connect' => zk_uri + zk_chroot, - 'log.flush.interval.messages' => 5, - }) - end - - let :consumers do - consumers_count.times.map do |index| - Heller::ZookeeperConsumer.new(zk_uri + zk_chroot, { - group_id: 'ktl-test-consumers', - consumer_id: 'ktl-test-consumer-' + index.to_s, - rebalance_retries: 20, - rebalance_retry_backoff: 100, - auto_reset_offset: :smallest, - fetch_max_wait: 100, - timeout: 1000, - socket_timeout: 1000, - auto_commit_interval: 500, - }) - end - end - - let :topics do - 2.times.map { |i| %(ktl-test-topic#{i}) } - end - - before do - clear_zk_chroot - broker.start - topics.each do |topic| - create_topic(%W[#{topic} --partitions 8]) - end - wait_until_topics_exist(%(localhost:#{broker_port}), topics) - topics.each do |topic| - publish_messages(%(localhost:#{broker_port}), topic, 20) - end - end - - after do - broker.shutdown - end - - describe 'lag' do - let :streams do - consumers.flat_map { |c| c.create_streams_by_filter('ktl-test-topic.*', 1) }.map(&:iterator) - end - - let :output do - capture { run(%w[consumer lag ktl-test-consumers], zk_args) }.split("\n") - end - - before do - streams.each do |stream| - 3.times { stream.next } - end - consumers.each(&:commit) - end - - after do - consumers.each(&:close) - end - - it 'prints a table with progress information about given consumer group' do - output.shift # get rid of header - output.each do |line| - line = line.split - expect(line[0]).to eq('ktl-test-consumers') - expect(line[1]).to satisfy { |topic| topics.include?(topic) } - expect(line[6]).to eq('ktl-test-consumers_ktl-test-consumer-0-0') - end - end - end -end diff --git a/spec/integration/ktl_topic_spec.rb b/spec/integration/ktl_topic_spec.rb index e1670aa..b11102a 100644 --- a/spec/integration/ktl_topic_spec.rb +++ b/spec/integration/ktl_topic_spec.rb @@ -13,12 +13,12 @@ describe 'list' do before do - create_topic('topic1', %w[topic1 --partitions 1 --replication-factor 1]) + create_topic(%w[topic1 --partitions 1 --replication-factor 1]) end it 'lists current topics to $stdout' do output = capture { run(%w[topic list], zk_args) } - expect(output).to match('topic1') + expect(output.strip).to eq('topic1') end end @@ -72,13 +72,13 @@ end it 'uses given number of partitions' do - partitions = Kafka::Utils.get_partitions_for_topic(ktl_zk, 'topic1') - expect(partitions.size).to eq(2) + partitions = ktl_zk.get_partitions_for_topics(scala_list(['topic1'])) + expect(partitions['topic1'].size).to eq(2) end it 'uses given replication factor' do - 2.times do |i| - replicas = Kafka::Utils::ZkUtils.get_replicas_for_partition(ktl_zk, 'topic1', i) + 2.times do |index| + replicas = ktl_zk.get_replicas_for_partition('topic1', index) expect(replicas.size).to eq(2) end end @@ -89,10 +89,12 @@ end it 'uses the given replica assignment' do - replicas = Kafka::Utils::ZkUtils.get_replicas_for_partition(ktl_zk, 'topic1', 0) - expect(replicas).to eq(scala_int_list([0, 1])) - replicas = Kafka::Utils::ZkUtils.get_replicas_for_partition(ktl_zk, 'topic1', 1) - expect(replicas).to eq(scala_int_list([1, 0])) + replicas = [] + ktl_zk.get_replicas_for_partition('topic1', 0).foreach { |r| replicas << r } + expect(replicas).to eq([0, 1]) + replicas.clear + ktl_zk.get_replicas_for_partition('topic1', 1).foreach { |r| replicas << r } + expect(replicas).to eq([1, 0]) end end @@ -110,15 +112,15 @@ describe 'add-partitions' do before do - create_topic('topic1', %w[topic1 --partitions 1 --replication-factor 2]) + create_topic(%w[topic1 --partitions 1 --replication-factor 2]) end it 'expands the number of partitions for given topic' do - partitions = Kafka::Utils.get_partitions_for_topic(ktl_zk, 'topic1') - expect(partitions.size).to eq(1) + partitions = ktl_zk.get_partitions_for_topics(scala_list(['topic1'])) + expect(partitions['topic1'].size).to eq(1) silence { run(%w[topic add-partitions], %w[topic1 --partitions 2] + zk_args) } - partitions = Kafka::Utils.get_partitions_for_topic(ktl_zk, 'topic1') - expect(partitions.size).to eq(2) + partitions = ktl_zk.get_partitions_for_topics(scala_list(['topic1'])) + expect(partitions['topic1'].size).to eq(2) end end @@ -130,38 +132,7 @@ it 'creates a delete marker for given topic' do silence { run(%w[topic delete], %w[topic1] + zk_args) } delete_path = Kafka::Utils::ZkUtils.get_delete_topic_path('topic1') - expect(ktl_zk.exists?(delete_path)).to be true - end - end - - describe 'reaper' do - let :kafka_broker do - Kafka::Test.create_kafka_server({ - 'broker.id' => 1, - 'port' => 9192, - 'zookeeper.connect' => zk_uri + zk_chroot, - }) - end - - before do - clear_zk_chroot - kafka_broker.start - create_topic(%w[topic1 --partitions 1]) - create_topic(%w[topic2 --partitions 2]) - create_topic(%w[topic-3 --partitions 3]) - wait_until_topics_exist('localhost:9192', %w[topic1 topic2 topic-3]) - end - - after do - kafka_broker.shutdown - end - - it 'creates a delete marker for each empty topic' do - silence { run(%w[topic reaper ^topic\d$ --delay 0], zk_args) } - %w[topic1 topic2].each do |topic| - delete_path = Kafka::Utils::ZkUtils.get_delete_topic_path(topic) - expect(ktl_zk.exists?(delete_path)).to be true - end + expect(ktl_zk.path_exists?(delete_path)).to be true end end diff --git a/spec/ktl/decommission_plan_spec.rb b/spec/ktl/decommission_plan_spec.rb index f9cb963..a6547cb 100644 --- a/spec/ktl/decommission_plan_spec.rb +++ b/spec/ktl/decommission_plan_spec.rb @@ -18,11 +18,11 @@ module Ktl end let :brokers do - b = Scala::Collection::Immutable::List.empty - b = b.send('::', to_int(0)) - b = b.send('::', to_int(1)) - b = b.send('::', to_int(2)) - b + brokers = Scala::Collection::Mutable::ArrayBuffer.empty + brokers.send('+=', 0) + brokers.send('+=', 1) + brokers.send('+=', 2) + brokers end let :partitions do @@ -84,11 +84,11 @@ module Ktl end let :brokers do - b = Scala::Collection::Immutable::List.empty - b = b.send('::', to_int(0)) - b = b.send('::', to_int(1)) - b = b.send('::', to_int(2)) - b = b.send('::', to_int(3)) + b = Scala::Collection::Mutable::ArrayBuffer.empty + b = b.send('+=', 0) + b = b.send('+=', 1) + b = b.send('+=', 2) + b = b.send('+=', 3) b end @@ -109,11 +109,11 @@ module Ktl context 'when there isn\'t enough brokers to meet replication factor' do let :brokers do - b = Scala::Collection::Immutable::List.empty - b = b.send('::', to_int(0)) - b = b.send('::', to_int(1)) - b = b.send('::', to_int(2)) - b = b.send('::', to_int(3)) + b = Scala::Collection::Mutable::ArrayBuffer.empty + b = b.send('+=', 0) + b = b.send('+=', 1) + b = b.send('+=', 2) + b = b.send('+=', 3) b end diff --git a/spec/ktl/kafka_client_spec.rb b/spec/ktl/kafka_client_spec.rb deleted file mode 100644 index 040fe83..0000000 --- a/spec/ktl/kafka_client_spec.rb +++ /dev/null @@ -1,304 +0,0 @@ -# encoding: utf-8 - -require 'spec_helper' - - -module Ktl - describe KafkaClient do - let :client do - described_class.new(config) - end - - let :config do - { - hosts: %w[kafka:9092 kafka:9093 kafka:9094], - consumer_impl: consumer_impl, - backoff: backoff, - sleeper: sleeper, - logger: logger, - } - end - - let :consumer_impl do - double(:consumer_impl) - end - - let :backoff do - 2 - end - - let :sleeper do - double(:sleeper, sleep: nil) - end - - let :logger do - double(:logger, debug: nil, info: nil, warn: nil) - end - - let :topic_metadata do - double(:topic_metadata) - end - - let :topic_name do - 'some.topic.name' - end - - let :partition_metadata do - [ - double(partition_id: 0), - double(partition_id: 1), - double(partition_id: 2), - double(partition_id: 3), - double(partition_id: 4), - double(partition_id: 5), - ] - end - - let :brokers do - brokers = [ - double(connection_string: 'kafka:9092'), - double(connection_string: 'kafka:9093'), - double(connection_string: 'kafka:9094'), - ] - brokers * 2 - end - - let :consumers do - { - 'kafka:9092' => double(:consumer0), - 'kafka:9093' => double(:consumer1), - 'kafka:9094' => double(:consumer2), - } - end - - let :metadata_requests do - Hash.new(0) - end - - let :partition_errors do - [0] * partition_metadata.size - end - - before do - allow(consumer_impl).to receive(:new) do |connection_string| - consumers[connection_string] - end - end - - before do - s = allow(topic_metadata).to(receive(:each)) - partition_metadata.each do |meta| - s.and_yield(topic_name, meta) - allow(topic_metadata).to receive(:leader_for).with(topic_name, meta.partition_id).and_return(brokers[meta.partition_id]) - end - end - - before do - consumers.each do |connection_string, consumer| - allow(consumer).to receive(:offsets_before) do |*requests| - response = double(:offset_response) - allow(response).to receive(:error?) do - brokers.each_with_index.any? { |_, i| partition_errors[i] != 0 } - end - allow(response).to receive(:error) do |_, partition_id| - partition_errors[partition_id] - end - partition_offsets.each do |partition, offsets| - if partition_errors[partition].zero? - allow(response).to receive(:offsets).with(topic_name, partition).and_return(Array(offsets)) - end - end - response - end - allow(consumer).to receive(:metadata) do - metadata_requests[connection_string] = metadata_requests[connection_string] + 1 - topic_metadata - end - end - end - - before do - client.setup - end - - shared_examples 'error handling of an retryable error' do - let :index do - 1 - end - - let :consumer do - consumers[brokers[index].connection_string] - end - - let :log_messages do - [] - end - - before do - tries = 3 - consumers.each do |connection_string, consumer| - allow(consumer).to receive(:metadata) do - metadata_requests[connection_string] = metadata_requests[connection_string] + 1 - if tries.zero? - partition_errors[index] = 0 - else - tries -= 1 - end - topic_metadata - end - end - partition_errors[index] = error_code - end - - before do - allow(logger).to receive(:debug) do |&block| - log_messages << block.call - end - end - - before do - client.send(method_name, {'some.topic.name' => [0, 1, 2]}, *method_args) - end - - it 'retries error\'ed partitions' do - expect(consumer).to have_received(:offsets_before).exactly(4).times - end - - it 'fetches new metadata' do - expect(metadata_requests.values.reduce(:+)).to eq(4) - end - - it 'waits a bit before retrying' do - expect(sleeper).to have_received(:sleep).with(backoff*1000).exactly(3).times - end - - it 'logs a message about retrying' do - messages = log_messages.select { |message| message.match(/Failed to find metadata/) } - expect(messages.size).to eq(3) - end - end - - shared_examples 'dealing with retryable errors' do - context 'when a NotLeaderForPartition error code is returned' do - let :error_code do - 6 - end - - include_examples 'error handling of an retryable error' - end - - context 'when a UnknownTopicOrPartition error code is returned' do - let :error_code do - 3 - end - - include_examples 'error handling of an retryable error' - end - - context 'when a LeaderNotAvailable error code is returned' do - let :error_code do - 5 - end - - include_examples 'error handling of an retryable error' - end - end - - describe '#setup' do - it 'connects to each broker' do - consumers.each_key do |connection_string| - expect(consumer_impl).to have_received(:new).with(connection_string) - end - end - end - - describe '#partitions' do - it 'fetches metadata from a consumer' do - client.partitions - expect(metadata_requests.values.reduce(:+)).to eq(1) - end - - context 'without an explicit filter' do - it 'returns a hash of current known topics and partitions' do - expect(client.partitions).to eq(topic_name => partition_metadata.map(&:partition_id)) - end - end - - context 'with an explicit filter' do - it 'ignores other topics' do - expect(client.partitions(/other/)).to eq({}) - end - end - end - - describe '#earliest_offset' do - let :partition_offsets do - partition_metadata.each_with_object({}) do |partition, hash| - hash[partition.partition_id] = partition.partition_id * 2 - end - end - - let :method_name do - :earliest_offset - end - - let :method_args do - [] - end - - it 'fetches earliest offsets for each partition' do - offsets = client.earliest_offset({'some.topic.name' => [0, 1, 2]}) - expect(offsets).to eq(topic_name => Hash[partition_offsets.take(3)]) - end - - include_examples 'dealing with retryable errors' - end - - describe '#latest_offset' do - let :partition_offsets do - partition_metadata.each_with_object({}) do |partition, hash| - hash[partition.partition_id] = (partition.partition_id * 2) + 1 - end - end - - let :method_name do - :latest_offset - end - - let :method_args do - [] - end - - it 'fetches latest offset for each partition' do - offsets = client.latest_offset({'some.topic.name' => [0, 1, 2]}) - expect(offsets).to eq(topic_name => Hash[partition_offsets.take(3)]) - end - - include_examples 'dealing with retryable errors' - end - - describe '#offset_before' do - let :partition_offsets do - partition_metadata.each_with_object({}) do |partition, hash| - hash[partition.partition_id] = [partition.partition_id + 1] - end - end - - let :method_name do - :offset_before - end - - let :method_args do - [Time.utc(2015, 3, 19, 13).to_i] - end - - it 'fetches offsets before the given timestamp for each partition' do - offsets = client.offset_before({'some.topic.name' => [0, 1, 2]}, *method_args) - expect(offsets).to eq(topic_name => Hash[partition_offsets.take(3).map { |k, v| [k, v.first] }]) - end - - include_examples 'dealing with retryable errors' - end - end -end - diff --git a/spec/ktl/migration_plan_spec.rb b/spec/ktl/migration_plan_spec.rb index 30ac3ba..7753aa9 100644 --- a/spec/ktl/migration_plan_spec.rb +++ b/spec/ktl/migration_plan_spec.rb @@ -36,18 +36,18 @@ module Ktl end it 'returns an object with topic-partitions <-> new AR mappings' do - f = plan.first + f = plan.head expect(f.first).to be_a(Kafka::TopicAndPartition) expect(f.last).to be_a(Scala::Collection::Mutable::MutableList) end it 'includes the new leader in the AR mapping' do - f = plan.first + f = plan.head expect(f.last.contains?(1)).to be true end it 'includes the previous ARs in the new mapping' do - f = plan.first + f = plan.head expect(f.last.contains?(2)).to be true end diff --git a/spec/ktl/reassigner_spec.rb b/spec/ktl/reassigner_spec.rb index 9c196b6..ebb29e0 100644 --- a/spec/ktl/reassigner_spec.rb +++ b/spec/ktl/reassigner_spec.rb @@ -13,6 +13,10 @@ module Ktl double(:zk_client) end + let :zk_utils do + Kafka::Utils::ZkUtils.new(nil, nil, false) + end + let :options do {} end @@ -93,7 +97,7 @@ module Ktl replicas = scala_int_list([0, 1, 2]) r += Scala::Tuple.new(topic_partition, replicas) end - Kafka::Utils::ZkUtils.get_partition_reassignment_zk_data(r) + zk_utils.format_as_reassignment_json(r) end let :overflow_part_2 do @@ -103,7 +107,7 @@ module Ktl replicas = scala_int_list([0, 1, 2]) r += Scala::Tuple.new(topic_partition, replicas) end - Kafka::Utils::ZkUtils.get_partition_reassignment_zk_data(r) + zk_utils.format_as_reassignment_json(r) end before do @@ -171,7 +175,7 @@ module Ktl end let :json do - Kafka::Utils::ZkUtils.get_partition_reassignment_zk_data(reassignment) + zk_utils.format_as_reassignment_json(reassignment) end it 'does not split the reassignment' do @@ -226,7 +230,7 @@ module Ktl it 'writes the remaining JSON to an overflow path in ZK' do overflow = Scala::Collection::Map.empty overflow = overflow_znodes.reduce(overflow) do |acc, (path, data)| - data = Kafka::Utils::ZkUtils.parse_partition_reassignment_data(data) + data = zk_utils.parse_partition_reassignment_data(data) acc.send('++', data) end expect(overflow.size).to eq(10_000) @@ -235,7 +239,7 @@ module Ktl it 'writes the same JSON to a state path' do state = Scala::Collection::Map.empty state = reassign_znodes.reduce(state) do |acc, (path, data)| - data = Kafka::Utils::ZkUtils.parse_partition_reassignment_data(data) + data = zk_utils.parse_partition_reassignment_data(data) acc.send('++', data) end expect(state.size).to eq(10_000) @@ -268,7 +272,7 @@ module Ktl it 'writes the remaining JSON to an overflow path in ZK' do overflow = Scala::Collection::Map.empty overflow = overflow_znodes.reduce(overflow) do |acc, (path, data)| - data = Kafka::Utils::ZkUtils.parse_partition_reassignment_data(data) + data = zk_utils.parse_partition_reassignment_data(data) acc.send('++', data) end expect(overflow.size).to eq(80) @@ -277,7 +281,7 @@ module Ktl it 'writes the same JSON to a state path' do reassigned = Scala::Collection::Map.empty reassigned = reassign_znodes.reduce(reassigned) do |acc, (path, data)| - data = Kafka::Utils::ZkUtils.parse_partition_reassignment_data(data) + data = zk_utils.parse_partition_reassignment_data(data) acc.send('++', data) end expect(reassigned.size).to eq(20) diff --git a/spec/ktl/shuffle_plan_spec.rb b/spec/ktl/shuffle_plan_spec.rb index 58925b4..e0e0906 100644 --- a/spec/ktl/shuffle_plan_spec.rb +++ b/spec/ktl/shuffle_plan_spec.rb @@ -212,4 +212,99 @@ def apply_reassignments(scala_reassignments) end end end + + describe RackAwareShufflePlan do + let :zk_utils do + double(:zk_utils) + end + + def generate_broker_metadata(broker_id) + @brokers[broker_id] ||= begin + index = broker_id % 10 + double("broker_#{index}", id: broker_id).tap do |broker| + rack_name = "rack-#{index}" + rack = double(rack_name, isDefined: true, get: rack_name) + allow(broker).to receive(:rack).and_return(rack) + end + end + end + + before do + @brokers = {} + allow(zk_client).to receive(:utils).and_return(zk_utils) + allow(Kafka::Admin).to receive(:get_broker_metadatas) do |zk_client, broker_list| + broker_list.map do |broker| + generate_broker_metadata(broker) + end + end + end + + describe '#generate' do + include_examples 'a shuffle plan' + + def each_reassignment(scala_reassignments) + ScalaEnumerable.new(scala_reassignments).each_with_object({}) do |t, result| + yield t.first.topic, t.first.partition, ScalaEnumerable.new(t.last).to_a + end + end + + def apply_reassignments(scala_reassignments) + each_reassignment(scala_reassignments) do |topic, partition, brokers| + assignments[topic][partition] = brokers + end + end + + context 'when adding brokers' do + before do + apply_reassignments(plan.generate) + brokers << 0xb3 + end + + it 'does not reassign leader to anything but the new broker' do + each_reassignment(plan.generate) do |topic, partition, brokers| + expect(brokers[0]).to satisfy { |leader| leader == 0xb3 || leader == assignments[topic][partition][0] } + end + end + + it 'demotes remaining brokers if new leader elected' do + each_reassignment(plan.generate) do |topic, partition, brokers| + if brokers[0] == 0xb3 + expect(brokers.drop(1)).to eq(assignments[topic][partition].take(replica_count-1)) + end + end + end + + it 'does not reassign followers to anything but the new broker' do + each_reassignment(plan.generate) do |topic, partition, brokers| + unless brokers[0] == 0xb3 + expect(brokers[1]).to satisfy { |follower| follower == 0xb3 || follower == assignments[topic][partition][1] } + end + end + end + end + + context 'with multiple brokers per rack' do + let :brokers do + [201, 202, 203, 101, 102, 103] + end + + let :replica_count do + 3 + end + + it 'chooses one broker per rack' do + each_reassignment(plan.generate) do |topic, partition, brokers| + racks = brokers.map { |broker| generate_broker_metadata(broker).rack.get } + expect(racks.uniq.size).to eql(3) + end + end + + it 'raises exception if broker is missing rack configuration' do + broker_metadata = generate_broker_metadata(203) + allow(broker_metadata.rack).to receive(:isDefined).and_return(false) + expect { plan.generate }.to raise_error /Broker 203 is missing rack information/ + end + end + end + end end diff --git a/spec/support/integration.rb b/spec/support/integration.rb index be783f1..820a335 100644 --- a/spec/support/integration.rb +++ b/spec/support/integration.rb @@ -6,19 +6,19 @@ end let :zk_uri do - 'localhost:2185' + '127.0.0.1:2185' end let :zk_chroot do '/ktl-test' end - let :control_zk do - Kafka::Utils.new_zk_client(zk_uri) + let :ktl_zk do + Kafka::Utils::ZkUtils.apply(zk_uri + zk_chroot, 30_000, 30_000, false) end - let :ktl_zk do - Kafka::Utils.new_zk_client(zk_uri + zk_chroot) + let :zk_utils do + Kafka::Utils::ZkUtils.apply(zk_uri, 30000, 30000, false) end let :zk_args do @@ -30,46 +30,60 @@ def run(command, argv) end def fetch_json(path, key=nil) - d = Kafka::Utils::ZkUtils.read_data(ktl_zk, path).first + d = ktl_zk.read_data(path).first d = JSON.parse(d) key ? d[key] : d end def register_broker(id, name='localhost') - Kafka::Utils::ZkUtils.register_broker_in_zk(ktl_zk, id, name, 9092, 1, 57476) + endpoint_tuple = Scala::Tuple.new( + Kafka::Protocol::SecurityProtocol.for_name('PLAINTEXT'), + Kafka::Cluster::EndPoint.create_end_point("PLAINTEXT://#{name}:9092") + ) + endpoints = Scala::Collection::Map.empty + endpoints += endpoint_tuple + ktl_zk.register_broker_in_zk(id, name, 9092, endpoints, 57476, Scala::Option["rack#{id}"], Kafka::Api::ApiVersion.apply('0.10.0.1')) end def clear_zk_chroot - Kafka::Utils::ZkUtils.delete_path_recursive(control_zk, zk_chroot) + zk_utils.delete_path_recursive(zk_chroot) end def setup_zk_chroot clear_zk_chroot - Kafka::Utils::ZkUtils.make_sure_persistent_path_exists(control_zk, zk_chroot) - Kafka::Utils::ZkUtils.setup_common_paths(ktl_zk) + zk_utils.create_persistent_path(zk_chroot, '', no_acl) + ktl_zk.setup_common_paths end def create_topic(*args) silence { run(%w[topic create], args + zk_args) } end + def no_acl + Kafka::Utils::ZkUtils::DefaultAcls(false) + end + def create_partitions(topic, options={}) - partitions_path = Kafka::Utils::ZkUtils.get_topic_partitions_path(topic) - Kafka::Utils::ZkUtils.create_persistent_path(ktl_zk, partitions_path, '') + partitions_path = ktl_zk.class.get_topic_partitions_path(topic) + ktl_zk.create_persistent_path(partitions_path, '', no_acl) partitions = options.fetch(:partitions, 1) partitions.times.map do |i| state_path = %(#{partitions_path}/#{i}/state) isr = options.fetch(:isr, [0]) state = {controller_epoch: 1, leader: isr.first, leader_epoch: 1, version: 1, isr: isr} - Kafka::Utils::ZkUtils.create_persistent_path(ktl_zk, state_path, state.to_json) + ktl_zk.create_persistent_path(state_path, state.to_json, no_acl) end end def wait_until_topics_exist(broker, topics) topics_exist, attempts = false, 0 + host, port = broker.split(':') + port = port.to_i + consumer = Kafka::Consumer::SimpleConsumer.new(host, port, 30_000, 64*1024, "ktl-integration-#{rand(100000)}") until topics_exist do - consumer = Heller::Consumer.new(broker) - metadata = consumer.metadata + request = Kafka::JavaApi::TopicMetadataRequest.new([]) + metadata = Kafka::TopicMetadataResponse.new(consumer.send(request)) + if topics.all? { |topic| metadata.leader_for(topic, 0) rescue false } topics_exist = true elsif attempts > 10 @@ -79,17 +93,10 @@ def wait_until_topics_exist(broker, topics) attempts += 1 end consumer.close + Kafka::Metrics::KafkaMetricsGroup.remove_all_consumer_metrics(consumer.client_id) end end - def publish_messages(broker, topic, num_messages, key=nil) - producer = Heller::Producer.new(broker) - num_messages.times do |num| - producer.push(Heller::Message.new(topic, "message #{num}", (key || num).to_s)) - end - producer.close - end - before do zk_server.start setup_zk_chroot @@ -97,7 +104,7 @@ def publish_messages(broker, topic, num_messages, key=nil) after do clear_zk_chroot - control_zk.close + zk_utils.close ktl_zk.close zk_server.shutdown end diff --git a/spec/support/kafka_utils.rb b/spec/support/kafka_utils.rb index cb0c822..73509da 100644 --- a/spec/support/kafka_utils.rb +++ b/spec/support/kafka_utils.rb @@ -1,6 +1,31 @@ # encoding: utf-8 +require 'slf4j-jars' +require 'kafka-jars' + + module Kafka + module JavaApi + java_import 'kafka.javaapi.TopicMetadata' + java_import 'kafka.javaapi.TopicMetadataRequest' + java_import 'kafka.javaapi.TopicMetadataResponse' + end + + module Consumer + include_package 'kafka.consumer' + java_import 'kafka.javaapi.consumer.SimpleConsumer' + end + + module Metrics + java_import 'kafka.metrics.KafkaMetricsGroup' + + module KafkaMetricsGroup + def self.remove_all_consumer_metrics(client_id) + self.removeAllConsumerMetrics(client_id) + end + end + end + module Test java_import 'java.net.InetSocketAddress' java_import 'org.apache.zookeeper.server.ZooKeeperServer' @@ -63,4 +88,67 @@ def shutdown end end end + + class TopicMetadataResponse + include Enumerable + + def initialize(underlying) + @underlying = underlying + + @cache = Hash.new.tap do |h| + CACHES.each do |type| + h[type] = Hash.new({}) + end + end + end + + def each(&block) + metadata.each do |topic_metadata| + topic_metadata.partitions_metadata.each do |partition_metadata| + yield topic_metadata.topic, partition_metadata + end + end + end + + def metadata + @underlying.topics_metadata + end + + def leader_for(topic, partition) + with_cache(:leader, topic, partition) + end + + def isr_for(topic, partition) + with_cache(:isr, topic, partition) + end + alias_method :in_sync_replicas_for, :isr_for + + private + + CACHES = [:leader, :isr].freeze + + def with_cache(type, topic, partition) + return @cache[type][topic][partition] if @cache[type][topic][partition] + + partition_metadata = locate_partition_metadata(topic, partition) + + if partition_metadata + @cache[type][topic][partition] = partition_metadata.send(type) + else + raise NoSuchTopicPartitionCombinationError, "Cannot find (#{topic}:#{partition}) combination" + end + end + + def locate_partition_metadata(topic, partition) + metadata.each do |tm| + if tm.topic == topic + tm.partitions_metadata.each do |pm| + return pm if pm.partition_id == partition + end + end + end + + nil + end + end end