Skip to content

Commit

Permalink
[ISSUE #759]
Browse files Browse the repository at this point in the history
ISSUE #759] Change ResetOffsetBody response parse method to support fastjson schema
  • Loading branch information
WJL3333 committed Jan 3, 2022
1 parent de5f561 commit 83f60c1
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 5 deletions.
77 changes: 73 additions & 4 deletions internal/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,39 @@ type ResetOffsetBody struct {
OffsetTable map[primitive.MessageQueue]int64 `json:"offsetTable"`
}

// Decode note: the origin implementation for parse json is in gson format.
// this func should support both gson and fastjson schema.
func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
validJSON := gjson.ValidBytes(body)

var offsetTable map[primitive.MessageQueue]int64

if validJSON {
offsetTable = parseGsonFormat(body)
} else {
offsetTable = parseFastJsonFormat(body)
}

resetOffsetBody.OffsetTable = offsetTable
}

func parseGsonFormat(body []byte) map[primitive.MessageQueue]int64 {
result := gjson.ParseBytes(body)

rlog.Debug("offset table string "+result.Get("offsetTable").String(), nil)

offsetTable := make(map[primitive.MessageQueue]int64, 0)
offsetTableArray := strings.Split(result.Get("offsetTable").String(), "],[")

offsetStr := result.Get("offsetTable").String()
if len(offsetStr) <= 2 {
rlog.Warning("parse reset offset table json get nothing in body", map[string]interface{}{
"origin json": offsetStr,
})
return offsetTable
}

offsetTableArray := strings.Split(offsetStr, "],[")

for index, v := range offsetTableArray {
kvArray := strings.Split(v, "},")

Expand All @@ -315,7 +342,7 @@ func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
rlog.Error("Unmarshal offset error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return
return nil
}

if index == 0 {
Expand All @@ -329,9 +356,51 @@ func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
rlog.Error("Unmarshal message queue error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return
return nil
}
offsetTable[*kObj] = offset
}
resetOffsetBody.OffsetTable = offsetTable

return offsetTable
}

func parseFastJsonFormat(body []byte) map[primitive.MessageQueue]int64 {
offsetTable := make(map[primitive.MessageQueue]int64)

jsonStr := string(body)
offsetStr := gjson.Get(jsonStr, "offsetTable").String()

if len(offsetStr) <= 2 {
rlog.Warning("parse reset offset table json get nothing in body", map[string]interface{}{
"origin json": jsonStr,
})
return offsetTable
}

trimStr := offsetStr[2 : len(offsetStr)-1]

split := strings.Split(trimStr, ",{")

for _, v := range split {
tuple := strings.Split(v, "}:")

queueStr := "{" + tuple[0] + "}"

var err error
// ignore err for now
offset, err := strconv.Atoi(tuple[1])

var queue primitive.MessageQueue
err = json.Unmarshal([]byte(queueStr), &queue)

if err != nil {
rlog.Error("parse reset offset table json get nothing in body", map[string]interface{}{
"origin json": jsonStr,
})
}

offsetTable[queue] = int64(offset)
}

return offsetTable
}
50 changes: 49 additions & 1 deletion internal/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
}

func TestRestOffsetBody_MarshalJSON(t *testing.T) {
Convey("test ResetOffset Body Decode", t, func() {
Convey("test ResetOffset Body Decode gson json schema", t, func() {
body := "{\"offsetTable\":[[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":5},23354233],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":4},23354245],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":7},23354203],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":6},23354312],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":1},23373517],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":0},23373350],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":3},23373424],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":2},23373382]]}"
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
Expand All @@ -433,4 +433,52 @@ func TestRestOffsetBody_MarshalJSON(t *testing.T) {
}
So(offsetTable[messageQueue], ShouldEqual, 23354233)
})

Convey("test ResetOffset Body Decode fast json schema", t, func() {
body := "{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110,{\"brokerName\":\"RaftNode00\",\"queueId\":1,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":2,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":3,\"topic\":\"topicB\"}:0}}"
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
offsetTable := resetOffsetBody.OffsetTable
So(offsetTable, ShouldNotBeNil)
So(len(offsetTable), ShouldEqual, 4)
messageQueue := primitive.MessageQueue{
Topic: "topicB",
BrokerName: "RaftNode00",
QueueId: 0,
}
So(offsetTable[messageQueue], ShouldEqual, 11110)
})

Convey("test ResetOffset Body Decode fast json schema with one item", t, func() {
body := "{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110}}"
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
offsetTable := resetOffsetBody.OffsetTable
So(offsetTable, ShouldNotBeNil)
So(len(offsetTable), ShouldEqual, 1)
messageQueue := primitive.MessageQueue{
Topic: "topicB",
BrokerName: "RaftNode00",
QueueId: 0,
}
So(offsetTable[messageQueue], ShouldEqual, 11110)
})

Convey("test ResetOffset Body Decode empty fast json ", t, func() {
body := "{\"offsetTable\":{}}"
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
offsetTable := resetOffsetBody.OffsetTable
So(offsetTable, ShouldNotBeNil)
So(len(offsetTable), ShouldEqual, 0)
})

Convey("test ResetOffset Body Decode empty gson json ", t, func() {
body := "{\"offsetTable\":[]}"
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
offsetTable := resetOffsetBody.OffsetTable
So(offsetTable, ShouldNotBeNil)
So(len(offsetTable), ShouldEqual, 0)
})
}

0 comments on commit 83f60c1

Please sign in to comment.