Skip to content

Commit

Permalink
add(writers) added row and rows writers
Browse files Browse the repository at this point in the history
  • Loading branch information
illia-li committed Oct 14, 2023
1 parent 34fdeeb commit 196e744
Show file tree
Hide file tree
Showing 26 changed files with 1,627 additions and 0 deletions.
26 changes: 26 additions & 0 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"runtime"
"strings"
"time"

"github.com/gocql/gocql/pkg/writers"
)

type unsetColumn struct{}
Expand Down Expand Up @@ -478,6 +480,26 @@ func (f *framer) payload() {
f.flags |= flagCustomPayload
}

func (f *framer) writeRows(r writers.Rows, rows int) error {
r.Prepare(int32(rows))
read, err := r.WriteRows(f.buf)
f.buf = f.buf[read:]
return err
}

func (f *framer) writeRow(r writers.Rows) error {
r.Prepare(int32(1))
read, err := r.WriteRow(f.buf)
f.buf = f.buf[read:]
return err
}

func (f *framer) writeSolitaryRow(r writers.Row) error {
read, err := r.WriteRow(f.buf)
f.buf = f.buf[read:]
return err
}

// reads a frame form the wire into the framers buffer
func (f *framer) readFrame(r io.Reader, head *frameHeader) error {
if head.length < 0 {
Expand Down Expand Up @@ -1748,6 +1770,10 @@ func (f *framer) writeRegisterFrame(streamID int, w *writeRegisterFrame) error {
return f.finish()
}

func (f *framer) notEmpty() bool {
return len(f.buf) > 0
}

func (f *framer) readByte() byte {
if len(f.buf) < 1 {
panic(fmt.Errorf("not enough bytes in buffer to read byte require 1 got: %d", len(f.buf)))
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ require (
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/golang/snappy v0.0.3
github.com/google/uuid v1.3.1
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed
github.com/kr/pretty v0.1.0 // indirect
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.3.0 // indirect
gopkg.in/inf.v0 v0.9.1
)
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
74 changes: 74 additions & 0 deletions pkg/writers/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2012 The gocql Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package writers

import "errors"

var NAP = errors.New("write operation is completed, but there is still data left")

type Struct interface {
UnmarshalCQL2(data []byte) int32
}

// Rows describes interaction between callers and Rows`s designed for row by row write.
// Rows useful if expect response with undefined rows count and callers fill the data gradual.
// Callers must call call Prepare(n) before call WriteRow n times and/or before WriteRows call.
// Implementations must not modify the slice data, even temporarily and not retain data.
type Rows interface {
// Prepare helps inform Rows`s about count of rows that callers have. So callers can help Rows`s have better resource pre allocation.
Prepare(rows int32)
// WriteRow write a single row.
// If write is done and len(data)>w must return error=nil.
// If data is not enough to write a row must return io.ErrUnexpectedEOF
WriteRow(data []byte) (w int32, err error)
// WriteRows write all rows from data.
// If write is not done or is partial done because data not enough must return io.ErrUnexpectedEOF
WriteRows(data []byte) (w int32, err error)
ColumnsInRow() int
}

// Row describes interaction between callers and writers designed for solitary row write.
// Row useful if expect response with rows=1 or 0.
// Implementations must not modify the slice data, even temporarily and not retain data.
type Row interface {
// WriteRow write a single row.
// If write is done and len(data)>w must return error=nil.
// If data is not enough to write a row must return io.ErrUnexpectedEOF
WriteRow(data []byte) (write int32, err error)
ColumnsInRow() int
}

// Slice describes interaction between Rows and go slices (actually structures with slices).
// Rows must raise code from any panic due UnmarshalElem calls and call OnElemError.
// Implementations must not modify the slice data, even temporarily and not retain data.
// On wrong data cases implementations must use panic with detailed description.
// Implementations must independently monitor the sufficiency of the slice length.
type Slice interface {
// Init same as ...make(slice,count).
Init(count int32)
// Append same as build in append func.
Append(count int32)
// UnmarshalElem writes data to slice elem, returns write bites count.
UnmarshalElem(data []byte) int32
// NextElem switch Slice to next elem.
NextElem()
// OnElemError helps ToSliceWriter tell to Slice about panic due UnmarshalElem call.
OnElemError()
// CutOnDone helps Slice to free pre allocated resources due all Expand and Init calls.
CutOnDone()
// Len returns current Slice len.
Len() int
}

// Map describes interaction between Rows and go maps.
// Rows must raise code from any panic due UnmarshalMapElem calls and call OnMapElemError.
// Implementations must not modify the slice data, even temporarily and not retain data.
// On wrong data cases implementations must use panic with detailed description.
type Map interface {
// UnmarshalMapElem writes data to map elem, returns write bites count.
UnmarshalMapElem(data []byte) int32
// OnMapElemError helps ToMapWriter tell to Map about panic due UnmarshalMapElem call.
OnMapElemError()
}
155 changes: 155 additions & 0 deletions pkg/writers/row/bench_anti_panic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// nolint
package row

import (
"fmt"
"strings"
"testing"
)

func BenchmarkAntiPanicOrCheckLen_test(b *testing.B) {
bytesLen := 50000000
data := randBytes(bytesLen)
name := fmt.Sprintf("framer len %d", bytesLen)
runAntiPanicWarmup(data, b)
runAntiPanicSet(name, data, b)
bytesLen = 500000
data = randBytes(bytesLen)
name = fmt.Sprintf("framer len %d", bytesLen)
runAntiPanicSet(name, data, b)
bytesLen = 500
data = randBytes(bytesLen)
name = fmt.Sprintf("framer len %d", bytesLen)
runAntiPanicSet(name, data, b)
bytesLen = 50
data = randBytes(bytesLen)
name = fmt.Sprintf("framer len %d", bytesLen)
runAntiPanicSet(name, data, b)
}

func runAntiPanicWarmup(data []byte, b *testing.B) {
b.Run("WARMUP", func(b *testing.B) {
runReadWith("--------warmup---------", read2If4, data, b)
runReadWith("--------warmup---------", read4WithIf10, data, b)
})
}

func runAntiPanicSet(setName string, data []byte, b *testing.B) {
b.Run(setName, func(b *testing.B) {
fmt.Println("--------------------------------------", setName, "---------------------------------------")
runReadWith(" read2If4", read2If4, data, b)
runReadWith("read2WithAntiPanic", read2WithAntiPanic, data, b)
runReadWith(" read4WithIf10", read4WithIf10, data, b)
runReadWith("read4WithAntiPanic", read4WithAntiPanic, data, b)
})
}

func runReadWith(name string, rFunc func([]byte), data []byte, b *testing.B) {
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
rFunc(data)
}
})
}

//go:noinline
func read2If4(data []byte) {
elems := len(data) / 4
tmp := int16(0)
var err error
for idx := 0; idx < elems; idx++ {
tmp, err = dec2ToInt16Err(data)
if err != nil {
return
}
data = data[2:]
tmp, err = dec2ToInt16Err(data)
if err != nil {
return
}
data = data[2:]
_ = tmp
}
}

//go:noinline
func read2WithAntiPanic(data []byte) {
defer func() {
if errF := recover(); errF != nil {
if strings.Contains(fmt.Sprintf("%s", errF), "runtime error: index out of range") {
return
}
}
}()
elems := len(data) / 4
tmp := int16(0)
for idx := 0; idx < elems; idx++ {
tmp = dec2ToInt16(data)
data = data[2:]
tmp = dec2ToInt16(data)
data = data[2:]
_ = tmp
}
}

//go:noinline
func read4WithIf10(data []byte) {
elems := len(data) / 4
tmp := int16(0)
var err error
for idx := 0; idx < elems; idx++ {
tmp, err = twiceDec2ToInt16Err(data)
if err != nil {
return
}
data = data[4:]
tmp, err = twiceDec2ToInt16Err(data)
if err != nil {
return
}
data = data[4:]
_ = tmp
}
}

//go:noinline
func read4WithAntiPanic(data []byte) {
defer func() {
if errF := recover(); errF != nil {
if strings.Contains(fmt.Sprintf("%s", errF), "runtime error: index out of range") {
return
}
}
}()
elems := len(data) / 8
tmp := int16(0)
for idx := 0; idx < elems; idx++ {
tmp = twiceDec2ToInt16(data)
data = data[4:]
tmp = twiceDec2ToInt16(data)
data = data[4:]
_ = tmp
}
}

func twiceDec2ToInt16Err(data []byte) (int16, error) {
tmp, err := dec2ToInt16Err(data)
if err != nil {
return 0, err
}
data = data[2:]
tmp, err = dec2ToInt16Err(data)
if err != nil {
return 0, err
}
data = data[2:]
return tmp, nil
}

func twiceDec2ToInt16(data []byte) int16 {
tmp := dec2ToInt16(data)
data = data[2:]
tmp = dec2ToInt16(data)
data = data[2:]
return tmp
}
88 changes: 88 additions & 0 deletions pkg/writers/row/bench_map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// nolint
package row

import (
"fmt"
"testing"
)

func BenchmarkRowsMapWriters_test(b *testing.B) {
b.ReportAllocs()
rows := 10000
data := randInts(rows, 3)
runRowsMapWritersWarmup(data, rows, b)
runRowsMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b)
rows = 1000
runRowsMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b)
rows = 100
runRowsMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b)
rows = 5
runRowsMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b)
}

func runRowsMapWritersWarmup(data []byte, rows int, b *testing.B) {
b.Run("WARMUP", func(b *testing.B) {
b.Run(" WARMUP", func(b *testing.B) {
for i := 0; i < b.N; i++ {
out := rowsMapDirect(data, rows)
_ = out
}
})
})
}

func runRowsMapWritersSet(setName string, data []byte, rows int, b *testing.B) {
b.Run(setName, func(b *testing.B) {
fmt.Println("--------------------------------------MAP", setName, "MAP---------------------------------------")
b.Run(setName+" rowsMapDirect", func(b *testing.B) {
var out ExampleMap
for i := 0; i < b.N; i++ {
out = rowsMapDirect(data, rows)
}
_ = out
})
b.Run(setName+" rowsMap", func(b *testing.B) {
var out ExampleMap
for i := 0; i < b.N; i++ {
out = rowsMap(data, rows)
}
_ = out
})
b.Run(setName+" rowsMapRef", func(b *testing.B) {
var out ExampleMapRefs
for i := 0; i < b.N; i++ {
out = rowsMapRef(data, rows)
}
_ = out
})
})
}

func rowsMapDirect(data []byte, rows int) ExampleMap {
out := make(ExampleMap, rows)
write := int32(0)
for idx := 0; idx < rows; idx++ {
var tmp ExampleStruct
write = tmp.UnmarshalCQL2(data[write:])
out[tmp.GetID()] = tmp
}
return out
}

func rowsMap(data []byte, rows int) ExampleMap {
out := make(ExampleMap, rows)
write := int32(0)
for idx := 0; idx < rows; idx++ {
write = out.UnmarshalMapElem(data[write:])
}
return out
}

func rowsMapRef(data []byte, rows int) ExampleMapRefs {
out := make(ExampleMapRefs, rows)
write := int32(0)
for idx := 0; idx < rows; idx++ {
write = out.UnmarshalMapElem(data[write:])
}
return out
}

0 comments on commit 196e744

Please sign in to comment.