Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to change replication-factor of topic #1238

Closed
DoubleLuck opened this issue Dec 18, 2018 · 9 comments · Fixed by #1595
Closed

How to change replication-factor of topic #1238

DoubleLuck opened this issue Dec 18, 2018 · 9 comments · Fixed by #1595

Comments

@DoubleLuck
Copy link

DoubleLuck commented Dec 18, 2018

Hi all, I try to use sarama to increase replication-factor of topic , but didn't success, I need your help.

Versions

Sarama Version: 97315fe
Kafka Version: 1.0.0 (scala 2.12)
Go Version: 1.11 (windows/amd64)

Configuration

Sarama Config: config.Version = sarama.V1_0_0_0
Kafka Config: 3 brokers on local CentOS, id : 0 , 1, 2
Topic Info: test444 partition 1 replication-factor 1

	Topic: test444	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test444	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

I hava set sarama.Logger

	logger := log.New(os.Stderr, "[sarama] ", log.LstdFlags)
	sarama.Logger = logger
Step1
func Replication(topic string, replicationFactor int) error {
	rp := strconv.Itoa(replicationFactor)
	entries := make(map[string]*string)
	entries["ReplicationFactor"] = &rp
	err := admin.AlterConfig(sarama.TopicResource, topic, entries, false)
	if err != nil {
		beego.Error(err)
		return err
	}
	return nil
}
  • kafka log:
org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: ReplicationFactor
  • sarama: no log
Step2
clusterAdmin.CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool)
  • parameter: ("test444", 1, [[0, 1]], false)
  • result: sarama return an error, no sarama log, no broker log
kafka server: Number of partitions is invalid.
  • parameter: ("test444", 3, [[0, 1], [1,2], [2,0]], false)
  • result: sarama return an error, no sarama log, no broker log
kafka server: Replica assignment is invalid.
  • parameter: ("test444", 3, [[0, 1], [1,2]], false)
  • result: sarama return an error, no sarama log, no broker log
kafka server: Replica assignment is invalid.

Maybe I pass wrong parameters or use incorrect configs, I didn't increase the replication-factor successfully. Please help me , THANK YOU!

@varun06
Copy link
Contributor

varun06 commented Mar 10, 2019

Looks like it should work. Are you still facing this issue, if not, can you please close this issue?

@DoubleLuck
Copy link
Author

Looks like it should work. Are you still facing this issue, if not, can you please close this issue?

I'm not working on this recently. I'll try later again, thanks.

@sladkoff
Copy link
Contributor

I'd like to re-open this.

Getting the same error "Unknown topic config name: ReplicationFactor".

Sarama version: 1.26.0
Kafka version: 2.3.1
Go version: 1.13.0

@d1egoaz
Copy link
Contributor

d1egoaz commented Jan 31, 2020

is this even supported by the kafka admin API?

@d1egoaz
Copy link
Contributor

d1egoaz commented Jan 31, 2020

as far I know in order to change the replication factor you'll need to:

  1. create a json file with the new replica assignments
  2. use the kafka-reassign-partitions tools to execute the new plan

the sarama admin example is misleading, there is no support for changing a topic's replication factor on the fly

@d1egoaz
Copy link
Contributor

d1egoaz commented Jan 31, 2020

@DoubleLuck @sladkoff I also have tested the CreatePartitions method.

It's working for me with:

err = admin.CreatePartitions(TestSinglePartitionTopic, 7, nil, false)
	if err != nil {
kafkacat -Lb localhost:9092
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
 1 brokers:
  broker 1001 at kafka:9092 (controller)
 3 topics:
  topic "test.1.nokey" with 7 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
    partition 1, leader 1001, replicas: 1001, isrs: 1001
    partition 2, leader 1001, replicas: 1001, isrs: 1001
    partition 3, leader 1001, replicas: 1001, isrs: 1001
    partition 4, leader 1001, replicas: 1001, isrs: 1001
    partition 5, leader 1001, replicas: 1001, isrs: 1001
    partition 6, leader 1001, replicas: 1001, isrs: 1001

or using and assignment plan:

number of brokers: 1
broker[0] id: 1001
current number of partitions: 1
current assignment: map[0:[1001]]

target: 4

You'll need to provide an assignment plan for the new partitions (3 new partitions)

	err = admin.CreatePartitions(TestSinglePartitionTopic, 4, [][]int32{{1001}, {1001}, {1001}}, false)
	if err != nil {
		fmt.Printf("   >>> kafkabridge_integration_test.go/: err: %T: %+v\n", err, err)
	}

current assignment after the change: map[0:[1001] 1:[1001] 2:[1001] 3:[1001]]

╰─○ kafkacat -Lb localhost:9092
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
 1 brokers:
  broker 1001 at kafka:9092 (controller)
 3 topics:
  topic "test.1.nokey" with 4 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
    partition 1, leader 1001, replicas: 1001, isrs: 1001
    partition 2, leader 1001, replicas: 1001, isrs: 1001
    partition 3, leader 1001, replicas: 1001, isrs: 1001

I think this could be in the docs, it's not straightforward to infer this, took me a bit no understand it.

@sladkoff
Copy link
Contributor

sladkoff commented Feb 3, 2020

@d1egoaz How can we use this to change the replica assignment for existing partitions? In your comment you create new partitions for the new assignment - your initial partition replication assignment is unchanged, isn't it?

Assume I have three broker IDS [1, 2, 3] and I have 3 partitions with replica assignment [1].

I want to update the assignment for these partitions to [1, 2, 3].

If I call CreatePartitions without increasing the partition count:

// assignment: [[1,2,3], [1,2,3], [1,2,3]]
admin.CreatePartitions("test-topic", 3, assignment, false)

I get error Number of partitions is invalid. - Topic already has 3 partitions.

I tried adding a new partition while also changing the replica assignment for all (now 4) partitions. Then I get the following error:

Replica assignment is invalid. - Increasing the number of partitions by 1 but 4 assignments provided.

It seems to me that it's only possible to set the replication assignment for new partitions which isn't sufficient. Can you verify this?

@sladkoff
Copy link
Contributor

sladkoff commented Feb 3, 2020

I'm assuming that the AlterPartitionReassignments needs to be implemented in order to edit assignments for existing partitions.

@d1egoaz I'd like to try and contribute this if you agree that this is the way to continue.

@d1egoaz
Copy link
Contributor

d1egoaz commented Feb 5, 2020

That'd be awesome! go for it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants