Skip to content
This repository has been archived by the owner on Mar 10, 2023. It is now read-only.

Add verbose and dryrun options to cluster shuffle #4

Merged
merged 4 commits into from Feb 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Expand Up @@ -94,6 +94,13 @@ there are also some commands for managing topics, most of which merely wraps the
`kafka-topics.sh` tool that is bundled with Kafka, but with a slightly different
interface.

The most notable difference is the `create` subcommand that has the ability to create
a consistent hashing plan and assign it to the new topic during creation.

```shell
$ ktl topic create 'some.topic' --partitions 3 --replication_factor 3 --rack_aware_allocation -z localhost:2181/test
```

## Copyright

© 2015 Burt AB, see LICENSE.txt (BSD 3-Clause).
16 changes: 10 additions & 6 deletions lib/ktl/cluster.rb
Expand Up @@ -36,7 +36,7 @@ def migrate_broker
with_zk_client do |zk_client|
old_leader, new_leader = options.values_at(:from, :to)
plan = MigrationPlan.new(zk_client, old_leader, new_leader)
reassigner = Reassigner.new(zk_client, limit: options.limit)
reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger)
execute_reassignment(reassigner, plan)
end
end
Expand All @@ -49,6 +49,8 @@ def migrate_broker
option :replication_factor, aliases: %w[-r], type: :numeric, desc: 'Replication factor to use'
option :limit, aliases: %w[-l], type: :numeric, desc: 'Max number of partitions to reassign at a time'
option :zookeeper, aliases: %w[-z], required: true, desc: 'ZooKeeper URI'
option :verbose, aliases: %w[-v], desc: 'Verbose output'
option :dryrun, aliases: %w[-d], desc: 'Output reassignment plan without executing'
def shuffle(regexp='.*')
with_zk_client do |zk_client|
plan_factory = if options.rack_aware
Expand All @@ -63,9 +65,11 @@ def shuffle(regexp='.*')
brokers: options.brokers,
blacklist: options.blacklist,
replication_factor: options.replication_factor,
logger: logger,
log_plan: options.dryrun,
})
reassigner = Reassigner.new(zk_client, limit: options.limit)
execute_reassignment(reassigner, plan)
reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger, log_assignments: options.verbose)
execute_reassignment(reassigner, plan, options.dryrun)
end
end

Expand All @@ -80,7 +84,7 @@ def decommission_broker(broker_id)
else
plan = DecommissionPlan.new(zk_client, broker_id.to_i)
end
reassigner = Reassigner.new(zk_client, limit: options.limit)
reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger)
execute_reassignment(reassigner, plan)
end
end
Expand All @@ -97,8 +101,8 @@ def reassignment_progress

private

def execute_reassignment(reassigner, plan)
ReassignmentTask.new(reassigner, plan, shell, logger: logger).execute
def execute_reassignment(reassigner, plan, dryrun = false)
ReassignmentTask.new(reassigner, plan, shell, logger: logger).execute(dryrun)
end
end
end
9 changes: 9 additions & 0 deletions lib/ktl/reassigner.rb
Expand Up @@ -7,6 +7,8 @@ def initialize(zk_client, options={})
@limit = options[:limit]
@overflow_path = '/ktl/overflow'
@state_path = '/ktl/reassign'
@logger = options[:logger] || NullLogger.new
@log_assignments = !!options[:log_assignments]
end

def reassignment_in_progress?
Expand Down Expand Up @@ -36,6 +38,13 @@ def load_overflow
def execute(reassignment)
reassignments = split(reassignment, @limit)
actual_reassignment = reassignments.shift
if @log_assignments
Scala::Collection::JavaConversions.as_java_iterable(actual_reassignment).each do |pr|
topic_and_partition, replicas = pr.elements
brokers = Scala::Collection::JavaConversions.as_java_iterable(replicas).to_a
@logger.info "Assigning #{topic_and_partition.topic},#{topic_and_partition.partition} to #{brokers.join(',')}"
end
end
json = reassignment_json(actual_reassignment)
@zk_client.reassign_partitions(json)
manage_overflow(reassignments)
Expand Down
8 changes: 6 additions & 2 deletions lib/ktl/reassignment_task.rb
Expand Up @@ -9,7 +9,7 @@ def initialize(reassigner, plan, shell, options={})
@logger = options[:logger] || NullLogger.new
end

def execute
def execute(dryrun = false)
if @reassigner.reassignment_in_progress?
@logger.warn 'reassignment already in progress, exiting'
else
Expand All @@ -22,7 +22,11 @@ def execute
end
if reassignment.size > 0
@logger.info 'reassigning %d partitions' % reassignment.size
@reassigner.execute(reassignment)
if dryrun
@logger.info 'dryrun detected, skipping reassignment'
else
@reassigner.execute(reassignment)
end
else
@logger.warn 'empty reassignment, ignoring'
end
Expand Down
13 changes: 13 additions & 0 deletions lib/ktl/shuffle_plan.rb
Expand Up @@ -5,6 +5,8 @@ class ShufflePlan
def initialize(zk_client, options = {})
@zk_client = zk_client
@options = options
@logger = options[:logger] || NullLogger.new
@log_plan = !!options[:log_plan]
end

def generate
Expand All @@ -26,13 +28,24 @@ def generate
topic_partition = Kafka::TopicAndPartition.new(topic, partition)
current_assignment = replica_assignments.apply(topic_partition)
unless current_assignment == replicas
@logger.info "Moving #{topic_partition.topic},#{topic_partition.partition} from #{current_assignment} to #{replicas}" if @log_plan
reassignment_plan += Scala::Tuple.new(topic_partition, replicas)
end
end
end
reassignment_plan
end

def generate_for_new_topic(topic, partition_count)
brokers = select_brokers
nr_replicas = @options[:replication_factor] || 1
assignment = assign_replicas_to_brokers(topic, brokers, partition_count, nr_replicas)
assignment.map do |pr|
partition, replicas = pr.elements
Scala::Collection::JavaConversions.as_java_iterable(replicas).to_a
end
end

private

def select_brokers
Expand Down
24 changes: 20 additions & 4 deletions lib/ktl/topic.rb
Expand Up @@ -33,20 +33,36 @@ def describe(regexp=nil)
option :replication_factor, aliases: %w[-r], default: '1', desc: 'Replication factor for new topic(s)'
option :replica_assignment, aliases: %w[-a], desc: 'Manual replica assignment'
option :disable_rack_aware, desc: 'Disable rack awareness'
option :rack_aware_allocation, desc: 'Create partitions with Rack aware + Rendezvous-hashing based shuffle'
option :rendezvous_allocation, desc: 'Create partitions with Rendezvous-hashing based shuffle'
option :config, aliases: %w[-c], desc: 'Key-value pairs of configuration options', type: :hash, default: {}
option :zookeeper, aliases: %w[-z], required: true, desc: 'ZooKeeper URI'
def create(*names)
with_zk_client do |zk_client|
names.each do |name|
opts = options.merge(create: nil, topic: name)
if options.rack_aware_allocation || options.rendezvous_allocation
plan_factory = if options.rack_aware_allocation
RackAwareShufflePlan
else
RendezvousShufflePlan
end

plan = plan_factory.new(zk_client, replication_factor: options.replication_factor.to_i)
zk_utils = Kafka::Utils::ZkUtils.new(nil, nil, false)
opts.delete(:rack_aware_allocation)
opts.delete(:rendezvous_allocation)
plan = plan.generate_for_new_topic(name, options.partitions.to_i)
opts[:replica_assignment] = plan.map {|broker_list| broker_list.join(':')}.join(',')
end
topic_options = Kafka::Admin.to_topic_options(opts)
silence_scala do
Kafka::Admin::TopicCommand.create_topic(zk_client.raw_client, topic_options)
end
message = %(created topic "#{name}" with #{options.partitions} partition(s))
message << %(, and replication factor #{options.replication_factor})
message << %(, with replica assignment: #{options.replica_assignment}) if options.replica_assignment
message << %(, with config: #{options.config}) unless options.config.empty?
message = %(created topic "#{name}" with #{opts[:partitions]} partition(s))
message << %(, and replication factor #{opts[:replication_factor]})
message << %(, with replica assignment: #{opts[:replica_assignment]}) if opts[:replica_assignment]
message << %(, with config: #{opts[:config]}) unless opts[:config].empty?
logger.info(message)
end
end
Expand Down
16 changes: 16 additions & 0 deletions spec/ktl/shuffle_plan_spec.rb
Expand Up @@ -158,6 +158,22 @@ module Ktl
end
end
end

context 'generate_for_new_topic' do
let :options do
super.merge(replication_factor: replica_count)
end

it 'generates a nested list of broker ids for a new topic' do
assignments.each do |topic, assignment|
generated_plan = plan.generate_for_new_topic(topic, assignment.size)
expect(generated_plan.size).to eql(assignment.size)
generated_plan.each do |partition|
expect(partition.uniq.size).to eql(replica_count)
end
end
end
end
end

describe ShufflePlan do
Expand Down