Skip to content

Commit

Permalink
add ListPartitionReassignments to admin client
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Feb 17, 2020
1 parent 8a52f4a commit 601e78f
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 21 deletions.
38 changes: 33 additions & 5 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ type ClusterAdmin interface {
// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

// Update the configuration for the specified resources with the default options.
// This operation is supported by brokers with version 0.11.0.0 or higher.
// The resources with their configs (topic is the only resource type with configs
// that can be updated currently Updates are not transactional so they may succeed
// for some resources while fail for others. The configs for a particular resource are updated automatically.
// Alter the replica assignment for partitions.
// This operation is supported by brokers with version 2.4.0.0 or higher.
AlterPartitionReassignments(topic string, assignment [][]int32) error

// Provides info on ongoing partitions replica reassignments.
// This operation is supported by brokers with version 2.4.0.0 or higher.
ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)

// Delete records whose offset is smaller than the given offset of the corresponding partition.
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteRecords(topic string, partitionOffsets map[int32]int64) error
Expand Down Expand Up @@ -500,6 +501,33 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][
})
}

func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
if topic == "" {
return nil, ErrInvalidTopic
}

request := &ListPartitionReassignmentsRequest{
TimeoutMs: int32(10000),
Version: int16(0),
}

request.AddBlock(topic, partitions)

b, err := ca.findAnyBroker()
if err != nil {
return nil, err
}
_ = b.Open(ca.client.Config())

rsp, err := b.ListPartitionReassignments(request)

if err == nil && rsp != nil {
return rsp.TopicStatus, nil
} else {
return nil, err
}
}

func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
if topic == "" {
return ErrInvalidTopic
Expand Down
70 changes: 70 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,76 @@ func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) {
}
}

func TestClusterAdminListPartitionReassignments(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_4_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1})
if err != nil {
t.Fatal(err)
}

partitionStatus, ok := response["my_topic"]
if !ok {
t.Fatalf("topic missing in response")
} else {
if len(partitionStatus) != 2 {
t.Fatalf("partition missing in response")
}
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var partitions = make([]int32, 0)

_, err = admin.ListPartitionReassignments("my_topic", partitions)

if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminDeleteRecords(t *testing.T) {
topicName := "my_topic"
seedBroker := NewMockBroker(t, 1)
Expand Down
32 changes: 16 additions & 16 deletions list_partition_reassignments_response.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package sarama

type listPartitionReassignmentsResponseBlock struct {
type PartitionReplicaReassignmentsStatus struct {
replicas []int32
addingReplicas []int32
removingReplicas []int32
}

func (b *listPartitionReassignmentsResponseBlock) encode(pe packetEncoder) error {
func (b *PartitionReplicaReassignmentsStatus) encode(pe packetEncoder) error {

if err := pe.putCompactInt32Array(b.replicas); err != nil {
return err
Expand All @@ -23,7 +23,7 @@ func (b *listPartitionReassignmentsResponseBlock) encode(pe packetEncoder) error
return nil
}

func (b *listPartitionReassignmentsResponseBlock) decode(pd packetDecoder) (err error) {
func (b *PartitionReplicaReassignmentsStatus) decode(pd packetDecoder) (err error) {

if b.replicas, err = pd.getCompactInt32Array(); err != nil {
return err
Expand All @@ -49,20 +49,20 @@ type ListPartitionReassignmentsResponse struct {
ThrottleTimeMs int32
ErrorCode KError
ErrorMessage *string
blocks map[string]map[int32]*listPartitionReassignmentsResponseBlock
TopicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus
}

func (r *ListPartitionReassignmentsResponse) AddBlock(topic string, partition int32, replicas, addingReplicas, removingReplicas []int32) {
if r.blocks == nil {
r.blocks = make(map[string]map[int32]*listPartitionReassignmentsResponseBlock)
if r.TopicStatus == nil {
r.TopicStatus = make(map[string]map[int32]*PartitionReplicaReassignmentsStatus)
}
partitions := r.blocks[topic]
partitions := r.TopicStatus[topic]
if partitions == nil {
partitions = make(map[int32]*listPartitionReassignmentsResponseBlock)
r.blocks[topic] = partitions
partitions = make(map[int32]*PartitionReplicaReassignmentsStatus)
r.TopicStatus[topic] = partitions
}

partitions[partition] = &listPartitionReassignmentsResponseBlock{replicas: replicas, addingReplicas: addingReplicas, removingReplicas: removingReplicas}
partitions[partition] = &PartitionReplicaReassignmentsStatus{replicas: replicas, addingReplicas: addingReplicas, removingReplicas: removingReplicas}
}

func (r *ListPartitionReassignmentsResponse) encode(pe packetEncoder) error {
Expand All @@ -72,8 +72,8 @@ func (r *ListPartitionReassignmentsResponse) encode(pe packetEncoder) error {
return err
}

pe.putCompactArrayLength(len(r.blocks))
for topic, partitions := range r.blocks {
pe.putCompactArrayLength(len(r.TopicStatus))
for topic, partitions := range r.TopicStatus {
if err := pe.putCompactString(topic); err != nil {
return err
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func (r *ListPartitionReassignmentsResponse) decode(pd packetDecoder, version in
return err
}

r.blocks = make(map[string]map[int32]*listPartitionReassignmentsResponseBlock, numTopics)
r.TopicStatus = make(map[string]map[int32]*PartitionReplicaReassignmentsStatus, numTopics)
for i := 0; i < numTopics; i++ {
topic, err := pd.getCompactString()
if err != nil {
Expand All @@ -128,19 +128,19 @@ func (r *ListPartitionReassignmentsResponse) decode(pd packetDecoder, version in
return err
}

r.blocks[topic] = make(map[int32]*listPartitionReassignmentsResponseBlock, ongoingPartitionReassignments)
r.TopicStatus[topic] = make(map[int32]*PartitionReplicaReassignmentsStatus, ongoingPartitionReassignments)

for j := 0; j < ongoingPartitionReassignments; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}

block := &listPartitionReassignmentsResponseBlock{}
block := &PartitionReplicaReassignmentsStatus{}
if err := block.decode(pd); err != nil {
return err
}
r.blocks[topic][partition] = block
r.TopicStatus[topic][partition] = block
}

if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
Expand Down
23 changes: 23 additions & 0 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,29 @@ func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder)
return res
}

type MockListPartitionReassignmentsResponse struct {
t TestReporter
}

func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
return &MockListPartitionReassignmentsResponse{t: t}
}

func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*ListPartitionReassignmentsRequest)
_ = req
res := &ListPartitionReassignmentsResponse{}

for topic, partitions := range req.blocks {

for _, partition := range partitions {
res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
}
}

return res
}

type MockDeleteRecordsResponse struct {
t TestReporter
}
Expand Down

0 comments on commit 601e78f

Please sign in to comment.