Skip to content
This repository has been archived by the owner on Mar 10, 2023. It is now read-only.

Commit

Permalink
Add verbose and dryrun options to cluster shuffle
Browse files Browse the repository at this point in the history
--verbose will output all assignments executed in the current run
--dryrun will output all reassignments in the current plan
  • Loading branch information
David Dahl committed Feb 15, 2017
1 parent 36c5cb7 commit efcc15a
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 8 deletions.
16 changes: 10 additions & 6 deletions lib/ktl/cluster.rb
Expand Up @@ -36,7 +36,7 @@ def migrate_broker
with_zk_client do |zk_client|
old_leader, new_leader = options.values_at(:from, :to)
plan = MigrationPlan.new(zk_client, old_leader, new_leader)
reassigner = Reassigner.new(zk_client, limit: options.limit)
reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger)
execute_reassignment(reassigner, plan)
end
end
Expand All @@ -49,6 +49,8 @@ def migrate_broker
option :replication_factor, aliases: %w[-r], type: :numeric, desc: 'Replication factor to use'
option :limit, aliases: %w[-l], type: :numeric, desc: 'Max number of partitions to reassign at a time'
option :zookeeper, aliases: %w[-z], required: true, desc: 'ZooKeeper URI'
option :verbose, aliases: %w[-v], desc: 'Verbose output'
option :dryrun, aliases: %w[-d], desc: 'Output reassignment plan without executing'
def shuffle(regexp='.*')
with_zk_client do |zk_client|
plan_factory = if options.rack_aware
Expand All @@ -63,9 +65,11 @@ def shuffle(regexp='.*')
brokers: options.brokers,
blacklist: options.blacklist,
replication_factor: options.replication_factor,
logger: logger,
log_plan: options.dryrun,
})
reassigner = Reassigner.new(zk_client, limit: options.limit)
execute_reassignment(reassigner, plan)
reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger, log_assignments: options.verbose)
execute_reassignment(reassigner, plan, options.dryrun)
end
end

Expand All @@ -80,7 +84,7 @@ def decommission_broker(broker_id)
else
plan = DecommissionPlan.new(zk_client, broker_id.to_i)
end
reassigner = Reassigner.new(zk_client, limit: options.limit)
reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger)
execute_reassignment(reassigner, plan)
end
end
Expand All @@ -97,8 +101,8 @@ def reassignment_progress

private

def execute_reassignment(reassigner, plan)
ReassignmentTask.new(reassigner, plan, shell, logger: logger).execute
def execute_reassignment(reassigner, plan, dryrun = false)
ReassignmentTask.new(reassigner, plan, shell, logger: logger).execute(dryrun)
end
end
end
9 changes: 9 additions & 0 deletions lib/ktl/reassigner.rb
Expand Up @@ -7,6 +7,8 @@ def initialize(zk_client, options={})
@limit = options[:limit]
@overflow_path = '/ktl/overflow'
@state_path = '/ktl/reassign'
@logger = options[:logger] || NullLogger.new
@log_assignments = !!options[:log_assignments]
end

def reassignment_in_progress?
Expand Down Expand Up @@ -36,6 +38,13 @@ def load_overflow
def execute(reassignment)
reassignments = split(reassignment, @limit)
actual_reassignment = reassignments.shift
if @log_assignments
Scala::Collection::JavaConversions.as_java_iterable(actual_reassignment).each do |pr|
topic_and_partition, replicas = pr.elements
brokers = Scala::Collection::JavaConversions.as_java_iterable(replicas).to_a
@logger.info "Assigning #{topic_and_partition.topic},#{topic_and_partition.partition} to #{brokers.join(',')}"
end
end
json = reassignment_json(actual_reassignment)
@zk_client.reassign_partitions(json)
manage_overflow(reassignments)
Expand Down
8 changes: 6 additions & 2 deletions lib/ktl/reassignment_task.rb
Expand Up @@ -9,7 +9,7 @@ def initialize(reassigner, plan, shell, options={})
@logger = options[:logger] || NullLogger.new
end

def execute
def execute(dryrun = false)
if @reassigner.reassignment_in_progress?
@logger.warn 'reassignment already in progress, exiting'
else
Expand All @@ -22,7 +22,11 @@ def execute
end
if reassignment.size > 0
@logger.info 'reassigning %d partitions' % reassignment.size
@reassigner.execute(reassignment)
if dryrun
@logger.info 'dryrun detected, skipping reassignment'
else
@reassigner.execute(reassignment)
end
else
@logger.warn 'empty reassignment, ignoring'
end
Expand Down
3 changes: 3 additions & 0 deletions lib/ktl/shuffle_plan.rb
Expand Up @@ -5,6 +5,8 @@ class ShufflePlan
def initialize(zk_client, options = {})
@zk_client = zk_client
@options = options
@logger = options[:logger] || NullLogger.new
@log_plan = !!options[:log_plan]
end

def generate
Expand All @@ -26,6 +28,7 @@ def generate
topic_partition = Kafka::TopicAndPartition.new(topic, partition)
current_assignment = replica_assignments.apply(topic_partition)
unless current_assignment == replicas
@logger.info "Moving #{topic_partition.topic},#{topic_partition.partition} from #{current_assignment} to #{replicas}" if @log_plan
reassignment_plan += Scala::Tuple.new(topic_partition, replicas)
end
end
Expand Down

0 comments on commit efcc15a

Please sign in to comment.