Skip to content

Commit

Permalink
feat: get metadata (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
medreza committed Dec 29, 2023
1 parent 0ec8868 commit 580eedb
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 0 deletions.
7 changes: 7 additions & 0 deletions blackhole.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package eventstream

import "time"

// BlackholeClient satisfies the publisher for mocking
type BlackholeClient struct{}

Expand Down Expand Up @@ -43,3 +45,8 @@ func (client *BlackholeClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder)
// do nothing
return nil
}

func (client *BlackholeClient) GetMetadata(_ string, _ time.Duration) (*Metadata, error) {
// do nothing
return nil, nil
}
1 change: 1 addition & 0 deletions eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ type Client interface {
PublishSync(publishBuilder *PublishBuilder) error
Register(subscribeBuilder *SubscribeBuilder) error
PublishAuditLog(auditLogBuilder *AuditLogBuilder) error
GetMetadata(topic string, timeout time.Duration) (*Metadata, error)
}

type AuditLog struct {
Expand Down
54 changes: 54 additions & 0 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type KafkaClient struct {
// current topic subscribed on the kafka client
topicSubscribedCount map[string]int

adminClient *kafka.AdminClient

stats statistics.Stats

statsLock sync.RWMutex
Expand Down Expand Up @@ -961,3 +963,55 @@ func newPublishBackoff() *backoff.ExponentialBackOff {
backoff.Multiplier = 4.0
return backoff
}

func (client *KafkaClient) getAdminClient(config *kafka.ConfigMap) (*kafka.AdminClient, error) {
if client.adminClient != nil {
return client.adminClient, nil
}

adminClient, err := kafka.NewAdminClient(config)
if err != nil {
return nil, err
}

return adminClient, nil
}

func (client *KafkaClient) GetMetadata(topic string, timeout time.Duration) (*Metadata, error) {
adminClient, err := client.getAdminClient(client.configMap)
if err != nil {
return nil, err
}

mTopic := &topic
if topic == "" {
mTopic = nil
}
metadata, err := adminClient.GetMetadata(mTopic, false, int(timeout.Milliseconds()))
if err != nil {
return nil, err
}

mBrokers := make([]BrokerMetadata, 0)
for _, m := range metadata.Brokers {
mBrokers = append(mBrokers, BrokerMetadata{
ID: m.ID,
Host: m.Host,
Port: m.Port,
})
}

return &Metadata{
Brokers: mBrokers,
}, nil
}

type BrokerMetadata struct {
ID int32
Host string
Port int
}

type Metadata struct {
Brokers []BrokerMetadata
}
15 changes: 15 additions & 0 deletions kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,21 @@ func TestKafkaUnregisterTopicSuccess(t *testing.T) {
}
}

func TestKafkaGetMetadata(t *testing.T) {
t.Parallel()
ctx, done := context.WithTimeout(context.Background(), time.Duration(timeoutTest)*time.Second)
defer done()

logrus.SetLevel(logrus.DebugLevel)

client := createKafkaClient(t)

deadline, _ := ctx.Deadline()
metadata, err := client.GetMetadata("", time.Until(deadline))
require.NoError(t, err)
assert.NotEmpty(t, metadata.Brokers[0].ID)
}

func callerFuncName() string {
pc, _, _, _ := runtime.Caller(2)
callerFunc := runtime.FuncForPC(pc)
Expand Down
5 changes: 5 additions & 0 deletions stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,8 @@ func (client *StdoutClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder) er
fmt.Println(string(message.Value))
return nil
}

func (client *StdoutClient) GetMetadata(_ string, _ time.Duration) (*Metadata, error) {
// do nothing
return nil, nil
}

0 comments on commit 580eedb

Please sign in to comment.