Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for alter/list partition reassignements APIs #1617

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in

request.AddBlock(topic, partitions)

b, err := ca.findAnyBroker()
b, err := ca.Controller()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this is using the controller you probably want to wrapper the body in a return ca.retryOnError(isErrNoController, ...) call like you did for AlterPartitionReassignments so it refreshes the cached controller if it is stale

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the retryOnError function cannot be used when a value needs to be returned. other functions in admin.go where a controller is used and a return type is needed also don't use retrying. How should this be handled? should we implement a retry function that returns something like interface{}?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dwi-di ah of course — let's just leave this for now and we can always follow-up in another PR

if err != nil {
return nil, err
}
Expand Down
44 changes: 36 additions & 8 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,17 @@ func TestClusterAdminAlterPartitionReassignments(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
})

Expand Down Expand Up @@ -367,10 +374,17 @@ func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
})

Expand Down Expand Up @@ -399,10 +413,17 @@ func TestClusterAdminListPartitionReassignments(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

Expand Down Expand Up @@ -437,10 +458,17 @@ func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

Expand Down