Skip to content

Commit

Permalink
Merge pull request #1966 from ajanikow/add_incremental_alter_config_o…
Browse files Browse the repository at this point in the history
…ptions

KIP-339: Add Incremental Config updates API
  • Loading branch information
dnwe committed Jun 14, 2021
2 parents d01b344 + 50caedd commit a5128bc
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 28 deletions.
71 changes: 43 additions & 28 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,9 @@ func (a *AlterConfigsResponse) encode(pe packetEncoder) error {
return err
}

for i := range a.Resources {
pe.putInt16(a.Resources[i].ErrorCode)
err := pe.putString(a.Resources[i].ErrorMsg)
if err != nil {
return nil
}
pe.putInt8(int8(a.Resources[i].Type))
err = pe.putString(a.Resources[i].Name)
if err != nil {
return nil
for _, v := range a.Resources {
if err := v.encode(pe); err != nil {
return err
}
}

Expand All @@ -56,30 +49,52 @@ func (a *AlterConfigsResponse) decode(pd packetDecoder, version int16) error {
for i := range a.Resources {
a.Resources[i] = new(AlterConfigsResourceResponse)

errCode, err := pd.getInt16()
if err != nil {
if err := a.Resources[i].decode(pd, version); err != nil {
return err
}
a.Resources[i].ErrorCode = errCode
}

e, err := pd.getString()
if err != nil {
return err
}
a.Resources[i].ErrorMsg = e
return nil
}

t, err := pd.getInt8()
if err != nil {
return err
}
a.Resources[i].Type = ConfigResourceType(t)
func (a *AlterConfigsResourceResponse) encode(pe packetEncoder) error {
pe.putInt16(a.ErrorCode)
err := pe.putString(a.ErrorMsg)
if err != nil {
return nil
}
pe.putInt8(int8(a.Type))
err = pe.putString(a.Name)
if err != nil {
return nil
}
return nil
}

name, err := pd.getString()
if err != nil {
return err
}
a.Resources[i].Name = name
func (a *AlterConfigsResourceResponse) decode(pd packetDecoder, version int16) error {
errCode, err := pd.getInt16()
if err != nil {
return err
}
a.ErrorCode = errCode

e, err := pd.getString()
if err != nil {
return err
}
a.ErrorMsg = e

t, err := pd.getInt8()
if err != nil {
return err
}
a.Type = ConfigResourceType(t)

name, err := pd.getString()
if err != nil {
return err
}
a.Name = name

return nil
}
Expand Down
12 changes: 12 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,18 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon
return response, nil
}

// IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) {
response := new(IncrementalAlterConfigsResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

// DeleteGroups sends a request to delete groups and returns a response or error
func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
response := new(DeleteGroupsResponse)
Expand Down
173 changes: 173 additions & 0 deletions incremental_alter_configs_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package sarama

type IncrementalAlterConfigsOperation int8

const (
IncrementalAlterConfigsOperationSet IncrementalAlterConfigsOperation = iota
IncrementalAlterConfigsOperationDelete
IncrementalAlterConfigsOperationAppend
IncrementalAlterConfigsOperationSubtract
)

// IncrementalAlterConfigsRequest is an incremental alter config request type
type IncrementalAlterConfigsRequest struct {
Resources []*IncrementalAlterConfigsResource
ValidateOnly bool
}

type IncrementalAlterConfigsResource struct {
Type ConfigResourceType
Name string
ConfigEntries map[string]IncrementalAlterConfigsEntry
}

type IncrementalAlterConfigsEntry struct {
Operation IncrementalAlterConfigsOperation
Value *string
}

func (a *IncrementalAlterConfigsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(a.Resources)); err != nil {
return err
}

for _, r := range a.Resources {
if err := r.encode(pe); err != nil {
return err
}
}

pe.putBool(a.ValidateOnly)
return nil
}

func (a *IncrementalAlterConfigsRequest) decode(pd packetDecoder, version int16) error {
resourceCount, err := pd.getArrayLength()
if err != nil {
return err
}

a.Resources = make([]*IncrementalAlterConfigsResource, resourceCount)
for i := range a.Resources {
r := &IncrementalAlterConfigsResource{}
err = r.decode(pd, version)
if err != nil {
return err
}
a.Resources[i] = r
}

validateOnly, err := pd.getBool()
if err != nil {
return err
}

a.ValidateOnly = validateOnly

return nil
}

func (a *IncrementalAlterConfigsResource) encode(pe packetEncoder) error {
pe.putInt8(int8(a.Type))

if err := pe.putString(a.Name); err != nil {
return err
}

if err := pe.putArrayLength(len(a.ConfigEntries)); err != nil {
return err
}

for name, e := range a.ConfigEntries {
if err := pe.putString(name); err != nil {
return err
}

if err := e.encode(pe); err != nil {
return err
}
}

return nil
}

func (a *IncrementalAlterConfigsResource) decode(pd packetDecoder, version int16) error {
t, err := pd.getInt8()
if err != nil {
return err
}
a.Type = ConfigResourceType(t)

name, err := pd.getString()
if err != nil {
return err
}
a.Name = name

n, err := pd.getArrayLength()
if err != nil {
return err
}

if n > 0 {
a.ConfigEntries = make(map[string]IncrementalAlterConfigsEntry, n)
for i := 0; i < n; i++ {
name, err := pd.getString()
if err != nil {
return err
}

var v IncrementalAlterConfigsEntry

if err := v.decode(pd, version); err != nil {
return err
}

a.ConfigEntries[name] = v
}
}
return err
}

func (a *IncrementalAlterConfigsEntry) encode(pe packetEncoder) error {
pe.putInt8(int8(a.Operation))

if err := pe.putNullableString(a.Value); err != nil {
return err
}

return nil
}

func (a *IncrementalAlterConfigsEntry) decode(pd packetDecoder, version int16) error {
t, err := pd.getInt8()
if err != nil {
return err
}
a.Operation = IncrementalAlterConfigsOperation(t)

s, err := pd.getNullableString()
if err != nil {
return err
}

a.Value = s

return nil
}

func (a *IncrementalAlterConfigsRequest) key() int16 {
return 44
}

func (a *IncrementalAlterConfigsRequest) version() int16 {
return 0
}

func (a *IncrementalAlterConfigsRequest) headerVersion() int16 {
return 1
}

func (a *IncrementalAlterConfigsRequest) requiredVersion() KafkaVersion {
return V2_3_0_0
}
98 changes: 98 additions & 0 deletions incremental_alter_configs_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package sarama

import "testing"

var (
emptyIncrementalAlterConfigsRequest = []byte{
0, 0, 0, 0, // 0 configs
0, // don't Validate
}

singleIncrementalAlterConfigsRequest = []byte{
0, 0, 0, 1, // 1 config
2, // a topic
0, 3, 'f', 'o', 'o', // topic name: foo
0, 0, 0, 1, // 1 config name
0, 10, // 10 chars
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, // OperationSet
0, 4,
'1', '0', '0', '0',
0, // don't validate
}

doubleIncrementalAlterConfigsRequest = []byte{
0, 0, 0, 2, // 2 config
2, // a topic
0, 3, 'f', 'o', 'o', // topic name: foo
0, 0, 0, 1, // 1 config name
0, 10, // 10 chars
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, // OperationSet
0, 4,
'1', '0', '0', '0',
2, // a topic
0, 3, 'b', 'a', 'r', // topic name: foo
0, 0, 0, 1, // 2 config
0, 12, // 12 chars
'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's',
1, // OperationDelete
0, 4,
'1', '0', '0', '0',
0, // don't validate
}
)

func TestIncrementalAlterConfigsRequest(t *testing.T) {
var request *IncrementalAlterConfigsRequest

request = &IncrementalAlterConfigsRequest{
Resources: []*IncrementalAlterConfigsResource{},
}
testRequest(t, "no requests", request, emptyIncrementalAlterConfigsRequest)

configValue := "1000"
request = &IncrementalAlterConfigsRequest{
Resources: []*IncrementalAlterConfigsResource{
{
Type: TopicResource,
Name: "foo",
ConfigEntries: map[string]IncrementalAlterConfigsEntry{
"segment.ms": {
Operation: IncrementalAlterConfigsOperationSet,
Value: &configValue,
},
},
},
},
}

testRequest(t, "one config", request, singleIncrementalAlterConfigsRequest)

request = &IncrementalAlterConfigsRequest{
Resources: []*IncrementalAlterConfigsResource{
{
Type: TopicResource,
Name: "foo",
ConfigEntries: map[string]IncrementalAlterConfigsEntry{
"segment.ms": {
Operation: IncrementalAlterConfigsOperationSet,
Value: &configValue,
},
},
},
{
Type: TopicResource,
Name: "bar",
ConfigEntries: map[string]IncrementalAlterConfigsEntry{
"retention.ms": {
Operation: IncrementalAlterConfigsOperationDelete,
Value: &configValue,
},
},
},
},
}

testRequest(t, "two configs", request, doubleIncrementalAlterConfigsRequest)
}
Loading

0 comments on commit a5128bc

Please sign in to comment.