Skip to content

Commit

Permalink
Merge branch 'main' into opt/race
Browse files Browse the repository at this point in the history
  • Loading branch information
AsterDY committed Nov 7, 2023
2 parents c30f6b6 + b84e227 commit 974beb5
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 103 deletions.
1 change: 1 addition & 0 deletions fuzz/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func fuzzMain(t *testing.T, data []byte) {
if !json.Valid(data) {
return
}
fuzzStream(t, data)
for i, typ := range []func() interface{}{
func() interface{} { return new(interface{}) },
func() interface{} { return new(map[string]interface{}) },
Expand Down
38 changes: 38 additions & 0 deletions fuzz/other_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
`unicode/utf8`

`github.com/bytedance/sonic/encoder`
`github.com/bytedance/sonic/decoder`
`github.com/stretchr/testify/require`
// `github.com/davecgh/go-spew/spew`
)
Expand All @@ -44,4 +45,41 @@ func fuzzHtmlEscape(t *testing.T, data []byte){
json.HTMLEscape(&jdst, data)
sdst = encoder.HTMLEscape(sdst, data)
require.Equalf(t, string(jdst.Bytes()), string(sdst), "different htmlescape results")
}

// data is random, check whether is panic
func fuzzStream(t *testing.T, data []byte) {
r := bytes.NewBuffer(data)
dc := decoder.NewStreamDecoder(r)
dc.ValidateString()
r1 := bytes.NewBuffer(data)
dc1 := decoder.NewStreamDecoder(r1)

w := bytes.NewBuffer(nil)
ec := encoder.NewStreamEncoder(w)
ec.SetCompactMarshaler(true)
ec.SetValidateString(true)
ec.SetEscapeHTML(true)
ec.SortKeys()
w1 := bytes.NewBuffer(nil)
ec1 := encoder.NewStreamEncoder(w1)

for dc1.More() {
if !dc.More() {
t.Fatal()
}
var obj interface{}
err := dc.Decode(&obj)
var obj1 interface{}
err1 := dc1.Decode(&obj1)
require.Equal(t, err1 == nil, err == nil)
// require.Equal(t, obj, obj1)
if err1 != nil {
return
}

ee := ec.Encode(obj)
ee1 := ec1.Encode(obj1)
require.Equal(t, ee == nil, ee1 == nil)
}
}
205 changes: 117 additions & 88 deletions internal/decoder/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (

`github.com/bytedance/sonic/internal/native`
`github.com/bytedance/sonic/internal/native/types`
`github.com/bytedance/sonic/internal/rt`
`github.com/bytedance/sonic/option`
)

var (
minLeftBufferShift uint = 1
minLeftBufferShift uint = 1
)

// StreamDecoder is the decoder context object for streaming input.
Expand Down Expand Up @@ -58,95 +59,71 @@ func NewStreamDecoder(r io.Reader) *StreamDecoder {
// Either io error from underlying io.Reader (except io.EOF)
// or syntax error from data will be recorded and stop subsequently decoding.
func (self *StreamDecoder) Decode(val interface{}) (err error) {
if self.err != nil {
return self.err
}

var buf = self.buf[self.scanp:]
var p = 0
var recycle bool
if cap(buf) == 0 {
buf = bufPool.Get().([]byte)
recycle = true
}

var first = true
var repeat = true

read_more:
for {
l := len(buf)
realloc(&buf)
n, err := self.r.Read(buf[l:cap(buf)])
buf = buf[:l+n]
if err != nil {
repeat = false
if err == io.EOF {
if len(buf) == 0 {
return err
}
break
}
self.err = err
return err
}
if n > 0 || first {
break
}
}
first = false

l := len(buf)
if l > 0 {
self.Decoder.Reset(string(buf))

var x int
if ret := native.SkipOneFast(&self.s, &x); ret < 0 {
if repeat {
goto read_more
// read more data into buf
if self.More() {
// println(string(self.buf))
var s = self.scanp
try_skip:
var e = len(self.buf)
// println("s:", s, "e:", e, "scanned:",self.scanned, "scanp:",self.scanp, self.buf)
var src = rt.Mem2Str(self.buf[s:e])
// if len(src) > 5 {
// println(src[:5], src[len(src)-5:])
// } else {
// println(src)
// }
// try skip
var x = 0;
if y := native.SkipOneFast(&src, &x); y < 0 {
if self.readMore() {
// println("more")
goto try_skip
} else {
err = SyntaxError{x, self.s, types.ParsingError(-ret), ""}
self.err = err
// println("no more")
err = SyntaxError{e, self.s, types.ParsingError(-s), ""}
self.setErr(err)
return
}
} else {
s = y + s
e = x + s
}


// println("decode: ", s, e)
// must copy string here for safety
self.Decoder.Reset(string(self.buf[s:e]))
err = self.Decoder.Decode(val)
if err != nil {
self.err = err
self.setErr(err)
return
}

p = self.Decoder.Pos()
self.scanned += int64(p)
self.scanp = 0
}

if l > p {
// remain undecoded bytes, so copy them into self.buf
self.buf = append(self.buf[:0], buf[p:]...)
} else {
self.buf = nil
recycle = true
}
self.scanp = e
_, empty := self.scan()
if empty {
// println("recycle")
// no remain valid bytes, thus we just recycle buffer
mem := self.buf
self.buf = nil
bufPool.Put(mem[:0])
} else {
// println("keep")
// remain undecoded bytes, move them onto head
n := copy(self.buf, self.buf[self.scanp:])
self.buf = self.buf[:n]
}

if recycle {
buf = buf[:0]
bufPool.Put(buf)
}
return err
}
self.scanned += int64(self.scanp)
self.scanp = 0
}

func (self StreamDecoder) repeatable(err error) bool {
if ee, ok := err.(SyntaxError); ok &&
(ee.Code == types.ERR_EOF || (ee.Code == types.ERR_INVALID_CHAR && self.i >= len(self.s)-1)) {
return true
}
return false
return self.err
}

// InputOffset returns the input stream byte offset of the current decoder position.
// The offset gives the location of the end of the most recently returned token and the beginning of the next token.
func (self *StreamDecoder) InputOffset() int64 {
// println("input offset",self.scanned, self.scanp)
return self.scanned + int64(self.scanp)
}

Expand All @@ -166,28 +143,72 @@ func (self *StreamDecoder) More() bool {
return err == nil && c != ']' && c != '}'
}

// More reports whether there is another element in the
// current array or object being parsed.
func (self *StreamDecoder) readMore() bool {
if self.err != nil {
return false
}

var err error
var n int
for {
// Grow buffer if not large enough.
l := len(self.buf)
realloc(&self.buf)

n, err = self.r.Read(self.buf[l:cap(self.buf)])
self.buf = self.buf[: l+n]

self.scanp = l
_, empty := self.scan()
if !empty {
return true
}

// buffer has been scanned, now report any error
if err != nil {
self.setErr(err)
return false
}
}
}

func (self *StreamDecoder) setErr(err error) {
self.err = err
mem := self.buf[:0]
self.buf = nil
bufPool.Put(mem)
}

func (self *StreamDecoder) peek() (byte, error) {
var err error
for {
for i := self.scanp; i < len(self.buf); i++ {
c := self.buf[i]
if isSpace(c) {
continue
}
self.scanp = i
return c, nil
c, empty := self.scan()
if !empty {
return byte(c), nil
}
// buffer has been scanned, now report any error
if err != nil {
if err != io.EOF {
self.err = err
}
self.setErr(err)
return 0, err
}
err = self.refill()
}
}

func (self *StreamDecoder) scan() (byte, bool) {
for i := self.scanp; i < len(self.buf); i++ {
c := self.buf[i]
if isSpace(c) {
continue
}
self.scanp = i
return c, false
}
return 0, true
}

func isSpace(c byte) bool {
return types.SPACE_MASK & (1 << c) != 0
}
Expand All @@ -212,17 +233,25 @@ func (self *StreamDecoder) refill() error {
return err
}

func realloc(buf *[]byte) {
func realloc(buf *[]byte) bool {
l := uint(len(*buf))
c := uint(cap(*buf))
if c == 0 {
// println("use pool!")
*buf = bufPool.Get().([]byte)
return true
}
if c - l <= c >> minLeftBufferShift {
// println("realloc!")
e := l+(l>>minLeftBufferShift)
if e < option.DefaultDecoderBufferSize {
e = option.DefaultDecoderBufferSize
if e <= c {
e = c*2
}
tmp := make([]byte, l, e)
copy(tmp, *buf)
*buf = tmp
return true
}
return false
}

0 comments on commit 974beb5

Please sign in to comment.