diff --git a/lib/ktl/cluster.rb b/lib/ktl/cluster.rb index 2e9c7e0..3ecc810 100644 --- a/lib/ktl/cluster.rb +++ b/lib/ktl/cluster.rb @@ -36,7 +36,7 @@ def migrate_broker with_zk_client do |zk_client| old_leader, new_leader = options.values_at(:from, :to) plan = MigrationPlan.new(zk_client, old_leader, new_leader) - reassigner = Reassigner.new(zk_client, limit: options.limit) + reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger) execute_reassignment(reassigner, plan) end end @@ -49,6 +49,8 @@ def migrate_broker option :replication_factor, aliases: %w[-r], type: :numeric, desc: 'Replication factor to use' option :limit, aliases: %w[-l], type: :numeric, desc: 'Max number of partitions to reassign at a time' option :zookeeper, aliases: %w[-z], required: true, desc: 'ZooKeeper URI' + option :verbose, aliases: %w[-v], desc: 'Verbose output' + option :dryrun, aliases: %w[-d], desc: 'Output reassignment plan without executing' def shuffle(regexp='.*') with_zk_client do |zk_client| plan_factory = if options.rack_aware @@ -63,9 +65,11 @@ def shuffle(regexp='.*') brokers: options.brokers, blacklist: options.blacklist, replication_factor: options.replication_factor, + logger: logger, + log_plan: options.dryrun, }) - reassigner = Reassigner.new(zk_client, limit: options.limit) - execute_reassignment(reassigner, plan) + reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger, log_assignments: options.verbose) + execute_reassignment(reassigner, plan, options.dryrun) end end @@ -80,7 +84,7 @@ def decommission_broker(broker_id) else plan = DecommissionPlan.new(zk_client, broker_id.to_i) end - reassigner = Reassigner.new(zk_client, limit: options.limit) + reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger) execute_reassignment(reassigner, plan) end end @@ -97,8 +101,8 @@ def reassignment_progress private - def execute_reassignment(reassigner, plan) - ReassignmentTask.new(reassigner, plan, shell, logger: logger).execute + def execute_reassignment(reassigner, plan, dryrun = false) + ReassignmentTask.new(reassigner, plan, shell, logger: logger).execute(dryrun) end end end diff --git a/lib/ktl/reassigner.rb b/lib/ktl/reassigner.rb index 1eb8a5e..4ca5afb 100644 --- a/lib/ktl/reassigner.rb +++ b/lib/ktl/reassigner.rb @@ -7,6 +7,8 @@ def initialize(zk_client, options={}) @limit = options[:limit] @overflow_path = '/ktl/overflow' @state_path = '/ktl/reassign' + @logger = options[:logger] || NullLogger.new + @log_assignments = !!options[:log_assignments] end def reassignment_in_progress? @@ -36,6 +38,13 @@ def load_overflow def execute(reassignment) reassignments = split(reassignment, @limit) actual_reassignment = reassignments.shift + if @log_assignments + Scala::Collection::JavaConversions.as_java_iterable(actual_reassignment).each do |pr| + topic_and_partition, replicas = pr.elements + brokers = Scala::Collection::JavaConversions.as_java_iterable(replicas).to_a + @logger.info "Assigning #{topic_and_partition.topic},#{topic_and_partition.partition} to #{brokers.join(',')}" + end + end json = reassignment_json(actual_reassignment) @zk_client.reassign_partitions(json) manage_overflow(reassignments) diff --git a/lib/ktl/reassignment_task.rb b/lib/ktl/reassignment_task.rb index c7c45a4..1582646 100644 --- a/lib/ktl/reassignment_task.rb +++ b/lib/ktl/reassignment_task.rb @@ -9,7 +9,7 @@ def initialize(reassigner, plan, shell, options={}) @logger = options[:logger] || NullLogger.new end - def execute + def execute(dryrun = false) if @reassigner.reassignment_in_progress? @logger.warn 'reassignment already in progress, exiting' else @@ -22,7 +22,11 @@ def execute end if reassignment.size > 0 @logger.info 'reassigning %d partitions' % reassignment.size - @reassigner.execute(reassignment) + if dryrun + @logger.info 'dryrun detected, skipping reassignment' + else + @reassigner.execute(reassignment) + end else @logger.warn 'empty reassignment, ignoring' end diff --git a/lib/ktl/shuffle_plan.rb b/lib/ktl/shuffle_plan.rb index f258153..3654a44 100644 --- a/lib/ktl/shuffle_plan.rb +++ b/lib/ktl/shuffle_plan.rb @@ -5,6 +5,8 @@ class ShufflePlan def initialize(zk_client, options = {}) @zk_client = zk_client @options = options + @logger = options[:logger] || NullLogger.new + @log_plan = !!options[:log_plan] end def generate @@ -26,6 +28,7 @@ def generate topic_partition = Kafka::TopicAndPartition.new(topic, partition) current_assignment = replica_assignments.apply(topic_partition) unless current_assignment == replicas + @logger.info "Moving #{topic_partition.topic},#{topic_partition.partition} from #{current_assignment} to #{replicas}" if @log_plan reassignment_plan += Scala::Tuple.new(topic_partition, replicas) end end