Skip to content

Commit

Permalink
[INLONG-1525]Fix bug which will cause Go SDK fail to parse SubscribeI…
Browse files Browse the repository at this point in the history
…nfo (#1526)

Signed-off-by: Zijie Lu <wslzj40@gmail.com>
  • Loading branch information
TszKitLo40 committed Sep 6, 2021
1 parent f2e773e commit 11353eb
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 8 deletions.
13 changes: 9 additions & 4 deletions tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C, error
node := &metadata.Node{}
node.SetHost(util.GetLocalHost())
node.SetAddress(c.master.Address)
auth := &protocol.AuthenticateInfo{}
if c.needGenMasterCertificateInfo(true) {
util.GenMasterAuthenticateToken(auth, c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
}
m.SetNode(node)
sub := &metadata.SubscribeInfo{}
sub.SetGroup(c.config.Consumer.Group)
Expand Down Expand Up @@ -512,7 +516,7 @@ func (c *consumer) genBrokerAuthenticInfo(force bool) *protocol.AuthorizedInfo {
return auth
}

func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, force bool) {
func (c *consumer) needGenMasterCertificateInfo(force bool) bool {
needAdd := false
if c.config.Net.Auth.Enable {
if force {
Expand All @@ -526,6 +530,7 @@ func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, f
if needAdd {
}
}
return needAdd
}

func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
Expand Down Expand Up @@ -700,10 +705,10 @@ func (c *consumer) close2Master() error {
sub := &metadata.SubscribeInfo{}
sub.SetGroup(c.config.Consumer.Group)
m.SetSubscribeInfo(sub)
mci := &protocol.MasterCertificateInfo{}
auth := &protocol.AuthenticateInfo{}
c.genMasterAuthenticateToken(auth, true)
mci := &protocol.MasterCertificateInfo{
AuthInfo: auth,
if c.needGenMasterCertificateInfo(true) {
util.GenMasterAuthenticateToken(auth, c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
}
c.subInfo.SetMasterCertificateInfo(mci)
rsp, err := c.client.CloseRequestC2M(ctx, m, c.subInfo)
Expand Down
4 changes: 4 additions & 0 deletions tubemq-client-twins/tubemq-client-go/client/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (h *heartbeatManager) consumerHB2Master() {
sub := &metadata.SubscribeInfo{}
sub.SetGroup(h.consumer.config.Consumer.Group)
m.SetSubscribeInfo(sub)
auth := &protocol.AuthenticateInfo{}
if h.consumer.needGenMasterCertificateInfo(true) {
util.GenMasterAuthenticateToken(auth, h.consumer.config.Net.Auth.UserName, h.consumer.config.Net.Auth.Password)
}
h.consumer.unreportedTimes++
if h.consumer.unreportedTimes > h.consumer.config.Consumer.MaxSubInfoReportInterval {
m.SetReportTimes(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *SubscribeInfo) String() string {
// If the given is invalid, it will return error.
// The format of subscribeInfo string: consumerId@group#broker_info#topic:partitionId
func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
s := strings.Split(subscribeInfo, "#")
s := strings.SplitN(subscribeInfo, "#", 2)
if len(s) == 1 {
return nil, errs.ErrInvalidSubscribeInfoString
}
Expand Down
2 changes: 1 addition & 1 deletion tubemq-client-twins/tubemq-client-go/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) {
r.indexPartitions = nil
return
}
if pos >= len(r.indexPartitions) {
if pos == -1 || pos >= len(r.indexPartitions) {
return
}
r.indexPartitions = append(r.indexPartitions[:pos], r.indexPartitions[pos+1:]...)
Expand Down
5 changes: 3 additions & 2 deletions tubemq-client-twins/tubemq-client-go/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"net"
"strconv"
"strings"

"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
)

// InvalidValue defines the invalid value of TubeMQ config.
Expand Down Expand Up @@ -72,8 +74,7 @@ func GenBrokerAuthenticateToken(username string, password string) string {
}

// GenMasterAuthenticateToken generates the master authenticate token.
func GenMasterAuthenticateToken(username string, password string) string {
return ""
func GenMasterAuthenticateToken(authInfo *protocol.AuthenticateInfo, username string, password string) {
}

// ParseConfirmContext parses the confirm context to partition key and bookedTime.
Expand Down

0 comments on commit 11353eb

Please sign in to comment.