Skip to content
This repository has been archived by the owner on Mar 10, 2023. It is now read-only.

Commit

Permalink
Merge branch 'rack_awareness'
Browse files Browse the repository at this point in the history
  • Loading branch information
David Dahl committed Jan 13, 2017
2 parents 6c8b1c4 + 389367d commit 36c5cb7
Show file tree
Hide file tree
Showing 27 changed files with 412 additions and 690 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ bundler_args: --without development
rvm:
- jruby
env:
- JRUBY_OPTS="$JRUBY_OPTS -Xcli.debug=true --debug"
- JRUBY_OPTS="$JRUBY_OPTS -Xcli.debug=true --debug -J-Xms512m -J-Xmx512m"
before_install:
- password=`openssl rsautl -decrypt -inkey ~/.ssh/id_rsa -in config/travis/secret.rsaenc`
- openssl aes-256-cbc -k "$password" -in config/travis/burtbot-travis-id_rsa.aes256enc -d -a -out id_private
Expand Down
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ track the progress of a reassignment.
The `shuffle` subcommand can either perform a random reassignment of partitions,
or it can use [rendezvous hashing](http://en.wikipedia.org/wiki/Rendezvous_hashing),
which will minimize the number of partitions that has to move between replicas
when adding or removing brokers.
when adding or removing brokers. There is also rack aware rendezvous hashing,
which will ensure that data is properly replicated across racks.

To start a random reassignment of the replica assignment of partitions matching
`^test.*`, but leaving the rest alone:
Expand All @@ -62,6 +63,10 @@ To do the same thing but using rendezvous hashing:
```shell
$ ktl cluster shuffle '^test.*' -R -z localhost:2181/test
```
To do the same thing using rack aware rendezvous hashing:
```shell
$ ktl cluster shuffle '^test.*' -a -z localhost:2181/test
```

#### Dealing with ZooKeeper's znode limit

Expand Down
2 changes: 1 addition & 1 deletion ktl.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ Gem::Specification.new do |s|

s.platform = 'java'

s.add_runtime_dependency 'heller', '~> 0.2'
s.add_runtime_dependency 'kafka-jars', '= 0.10.0.1'
s.add_runtime_dependency 'thor', '~> 0', '< 1.0'
end
35 changes: 31 additions & 4 deletions lib/ext/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

require 'kafka-jars'


module Log4j
include_package 'org.apache.log4j'
java_import 'org.apache.log4j.Logger'
Expand All @@ -22,6 +21,7 @@ module Exception
module Scala
java_import 'scala.Console'
java_import 'scala.Tuple2'
java_import 'scala.Option'

class Tuple2
alias_method :first, :_1
Expand Down Expand Up @@ -68,15 +68,24 @@ def self.new_zk_client(zk_connect, timeout=30_000)

def self.get_partitions_for_topic(zk, topic)
topics = Scala::Collection::Immutable::List.from_array([topic].to_java)
partitions = ZkUtils.get_partitions_for_topics(zk, topics)
partitions = zk.get_partitions_for_topics(topics)
partitions.get(topic).get
end

def self.delete_topic(zk, topic)
ZkUtils.create_persistent_path(zk, ZkUtils.get_delete_topic_path(topic), '')
acl = Kafka::Utils::ZkUtils::DefaultAcls(false)
zk.create_persistent_path(ZkUtils.get_delete_topic_path(topic), '', acl)
end
end

module Api
include_package 'kafka.api'
end

module Cluster
include_package 'kafka.cluster'
end

module Admin
include_package 'kafka.admin'

Expand All @@ -101,9 +110,27 @@ def self.preferred_replica(zk_client, topics_partitions)
end

def self.assign_replicas_to_brokers(brokers, partitions, repl_factor, index=-1, partition=-1)
assignment = AdminUtils.assign_replicas_to_brokers(brokers, partitions, repl_factor, index, partition)
assignment = AdminUtils.assign_replicas_to_brokers(brokers, partitions.to_java(:int), repl_factor.to_java(:int), index.to_java(:int), partition.to_java(:int))
ScalaEnumerable.new(assignment)
end

def self.get_broker_metadatas(zk_client, brokers, force_rack = true)
rack_aware = if force_rack
JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Enforced$').get_declared_field('MODULE$').get(nil)
else
JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Safe$').get_declared_field('MODULE$').get(nil)
end
broker_metadatas = Kafka::Admin::AdminUtils.get_broker_metadatas(
zk_client.utils,
rack_aware,
Scala::Option[Scala::Collection::JavaConversions.as_scala_iterable(brokers).to_list]
)
Scala::Collection::JavaConversions.seq_as_java_list(broker_metadatas).to_a
end
end

module Protocol
java_import 'org.apache.kafka.common.protocol.SecurityProtocol'
end

module Common
Expand Down
7 changes: 4 additions & 3 deletions lib/ktl.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# encoding: utf-8

unless defined?($SLF4J_BACKEND)
$SLF4J_BACKEND = 'log4j12'
end

require 'thor'
require 'json'
require 'heller'
require 'logger'
require 'ext/kafka'
require 'ext/thor'
Expand Down Expand Up @@ -36,9 +39,7 @@ def warn?; false end
require 'ktl/command'
require 'ktl/cluster'
require 'ktl/cluster_stats_task'
require 'ktl/consumer'
require 'ktl/decommission_plan'
require 'ktl/kafka_client'
require 'ktl/migration_plan'
require 'ktl/reassigner'
require 'ktl/reassignment_progress'
Expand Down
3 changes: 0 additions & 3 deletions lib/ktl/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

module Ktl
class Cli < Thor
desc 'consumer SUBCOMMAND ...ARGS', 'Commands for managing consumers'
subcommand 'consumer', Consumer

desc 'cluster SUBCOMMAND ...ARGS', 'Commands for managing a cluster'
subcommand 'cluster', Cluster

Expand Down
9 changes: 8 additions & 1 deletion lib/ktl/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,19 @@ def migrate_broker
option :brokers, type: :array, desc: 'Broker IDs'
option :blacklist, type: :array, desc: 'Broker IDs to exclude'
option :rendezvous, aliases: %w[-R], type: :boolean, desc: 'Whether to use Rendezvous-hashing based shuffle'
option :rack_aware, aliases: %w[-a], type: :boolean, desc: 'Whether to use Rack aware + Rendezvous-hashing based shuffle'
option :replication_factor, aliases: %w[-r], type: :numeric, desc: 'Replication factor to use'
option :limit, aliases: %w[-l], type: :numeric, desc: 'Max number of partitions to reassign at a time'
option :zookeeper, aliases: %w[-z], required: true, desc: 'ZooKeeper URI'
def shuffle(regexp='.*')
with_zk_client do |zk_client|
plan_factory = options.rendezvous ? RendezvousShufflePlan : ShufflePlan
plan_factory = if options.rack_aware
RackAwareShufflePlan
elsif options.rendezvous
RendezvousShufflePlan
else
ShufflePlan
end
plan = plan_factory.new(zk_client, {
filter: Regexp.new(regexp),
brokers: options.brokers,
Expand Down
2 changes: 1 addition & 1 deletion lib/ktl/cluster_stats_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def execute
brokers.foreach do |broker|
leader_for = ownership[broker.id]
share = leader_for.fdiv(partitions.size.to_f) * 100
@shell.say ' - %d (%s) leader for %d partitions (%.2f %%)' % [broker.id, broker.host, leader_for, share]
@shell.say ' - %d leader for %d partitions (%.2f %%)' % [broker.id, leader_for, share]
end
end

Expand Down
10 changes: 2 additions & 8 deletions lib/ktl/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,12 @@ def with_zk_client
rescue => e
logger.error '%s (%s)' % [e.message, e.class.name]
logger.debug e.backtrace.join($/)
$stderr.puts '%s (%s)' % [e.message, e.class.name]
$stderr.puts e.backtrace.join($/)
ensure
zk_client.close if zk_client
end

def with_kafka_client(options={})
brokers = with_zk_client { |zk_client| ScalaEnumerable.new(zk_client.brokers).map(&:connection_string) }
kafka_client = KafkaClient.create(options.merge(hosts: brokers))
yield kafka_client
ensure
kafka_client.close if kafka_client
end

def logger
@logger ||= Logger.new($stdout).tap do |log|
log.formatter = ShellFormater.new(shell)
Expand Down
14 changes: 0 additions & 14 deletions lib/ktl/consumer.rb

This file was deleted.

118 changes: 0 additions & 118 deletions lib/ktl/kafka_client.rb

This file was deleted.

4 changes: 2 additions & 2 deletions lib/ktl/migration_plan.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ module Ktl
class MigrationPlan
def initialize(zk_client, old_leader, new_leader)
@zk_client = zk_client
@old_leader = old_leader
@new_leader = new_leader
@old_leader = old_leader.to_java
@new_leader = new_leader.to_java
end

def generate
Expand Down
8 changes: 6 additions & 2 deletions lib/ktl/reassigner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,15 @@ def write_overflow(reassignments)
end

def reassignment_json(reassignment)
Kafka::Utils::ZkUtils.get_partition_reassignment_zk_data(reassignment)
zk_utils.format_as_reassignment_json(reassignment)
end

def parse_reassignment_json(json)
Kafka::Utils::ZkUtils.parse_partition_reassignment_data(json)
zk_utils.parse_partition_reassignment_data(json)
end

def zk_utils
@zk_utils ||= Kafka::Utils::ZkUtils.new(nil, nil, false)
end

def maybe_split_by_limit(reassignment, limit=nil)
Expand Down

0 comments on commit 36c5cb7

Please sign in to comment.