Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #759] Change ResetOffsetBody response parse method to support fastjson schema #765

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 73 additions & 4 deletions internal/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,39 @@ type ResetOffsetBody struct {
OffsetTable map[primitive.MessageQueue]int64 `json:"offsetTable"`
}

// Decode note: the origin implementation for parse json is in gson format.
// this func should support both gson and fastjson schema.
func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
validJSON := gjson.ValidBytes(body)

var offsetTable map[primitive.MessageQueue]int64

if validJSON {
offsetTable = parseGsonFormat(body)
} else {
offsetTable = parseFastJsonFormat(body)
}

resetOffsetBody.OffsetTable = offsetTable
}

func parseGsonFormat(body []byte) map[primitive.MessageQueue]int64 {
result := gjson.ParseBytes(body)

rlog.Debug("offset table string "+result.Get("offsetTable").String(), nil)

offsetTable := make(map[primitive.MessageQueue]int64, 0)
offsetTableArray := strings.Split(result.Get("offsetTable").String(), "],[")

offsetStr := result.Get("offsetTable").String()
if len(offsetStr) <= 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请教下,何时会触发这个情况

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里就只是简单防御性检查,一般情况下不会走这个逻辑

rlog.Warning("parse reset offset table json get nothing in body", map[string]interface{}{
"origin json": offsetStr,
})
return offsetTable
}

offsetTableArray := strings.Split(offsetStr, "],[")

for index, v := range offsetTableArray {
kvArray := strings.Split(v, "},")

Expand All @@ -315,7 +342,7 @@ func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
rlog.Error("Unmarshal offset error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return
return nil
}

if index == 0 {
Expand All @@ -329,9 +356,51 @@ func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
rlog.Error("Unmarshal message queue error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return
return nil
}
offsetTable[*kObj] = offset
}
resetOffsetBody.OffsetTable = offsetTable

return offsetTable
}

func parseFastJsonFormat(body []byte) map[primitive.MessageQueue]int64 {
offsetTable := make(map[primitive.MessageQueue]int64)

jsonStr := string(body)
offsetStr := gjson.Get(jsonStr, "offsetTable").String()

if len(offsetStr) <= 2 {
rlog.Warning("parse reset offset table json get nothing in body", map[string]interface{}{
"origin json": jsonStr,
})
return offsetTable
}

trimStr := offsetStr[2 : len(offsetStr)-1]

split := strings.Split(trimStr, ",{")

for _, v := range split {
tuple := strings.Split(v, "}:")

queueStr := "{" + tuple[0] + "}"

var err error
// ignore err for now
offset, err := strconv.Atoi(tuple[1])

var queue primitive.MessageQueue
err = json.Unmarshal([]byte(queueStr), &queue)

if err != nil {
rlog.Error("parse reset offset table json get nothing in body", map[string]interface{}{
"origin json": jsonStr,
})
}

offsetTable[queue] = int64(offset)
}

return offsetTable
}
50 changes: 49 additions & 1 deletion internal/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
}

func TestRestOffsetBody_MarshalJSON(t *testing.T) {
Convey("test ResetOffset Body Decode", t, func() {
Convey("test ResetOffset Body Decode gson json schema", t, func() {
body := "{\"offsetTable\":[[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":5},23354233],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":4},23354245],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":7},23354203],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":6},23354312],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":1},23373517],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":0},23373350],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":3},23373424],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":2},23373382]]}"
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
Expand All @@ -433,4 +433,52 @@ func TestRestOffsetBody_MarshalJSON(t *testing.T) {
}
So(offsetTable[messageQueue], ShouldEqual, 23354233)
})

Convey("test ResetOffset Body Decode fast json schema", t, func() {
body := "{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110,{\"brokerName\":\"RaftNode00\",\"queueId\":1,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":2,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":3,\"topic\":\"topicB\"}:0}}"
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
offsetTable := resetOffsetBody.OffsetTable
So(offsetTable, ShouldNotBeNil)
So(len(offsetTable), ShouldEqual, 4)
messageQueue := primitive.MessageQueue{
Topic: "topicB",
BrokerName: "RaftNode00",
QueueId: 0,
}
So(offsetTable[messageQueue], ShouldEqual, 11110)
})

Convey("test ResetOffset Body Decode fast json schema with one item", t, func() {
body := "{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110}}"
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
offsetTable := resetOffsetBody.OffsetTable
So(offsetTable, ShouldNotBeNil)
So(len(offsetTable), ShouldEqual, 1)
messageQueue := primitive.MessageQueue{
Topic: "topicB",
BrokerName: "RaftNode00",
QueueId: 0,
}
So(offsetTable[messageQueue], ShouldEqual, 11110)
})

Convey("test ResetOffset Body Decode empty fast json ", t, func() {
body := "{\"offsetTable\":{}}"
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
offsetTable := resetOffsetBody.OffsetTable
So(offsetTable, ShouldNotBeNil)
So(len(offsetTable), ShouldEqual, 0)
})

Convey("test ResetOffset Body Decode empty gson json ", t, func() {
body := "{\"offsetTable\":[]}"
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
offsetTable := resetOffsetBody.OffsetTable
So(offsetTable, ShouldNotBeNil)
So(len(offsetTable), ShouldEqual, 0)
})
}