From 196e744fd7530457b040fd469b2f3465141b0dc1 Mon Sep 17 00:00:00 2001 From: illia-li Date: Sat, 14 Oct 2023 15:31:29 -0400 Subject: [PATCH] add(writers) added row and rows writers --- frame.go | 26 +++ go.mod | 2 + go.sum | 4 + pkg/writers/interface.go | 74 ++++++++ pkg/writers/row/bench_anti_panic_test.go | 155 +++++++++++++++++ pkg/writers/row/bench_map_test.go | 88 ++++++++++ pkg/writers/row/bench_ref_map_test.go | 77 +++++++++ pkg/writers/row/bench_slice_refs_test.go | 176 ++++++++++++++++++++ pkg/writers/row/bench_slice_test.go | 173 +++++++++++++++++++ pkg/writers/row/example_map.go | 52 ++++++ pkg/writers/row/example_ref_slice.go | 50 ++++++ pkg/writers/row/example_ref_slice_refs.go | 51 ++++++ pkg/writers/row/example_slice.go | 46 +++++ pkg/writers/row/example_slice_refs.go | 47 ++++++ pkg/writers/row/example_struct.go | 81 +++++++++ pkg/writers/row/utils.go | 60 +++++++ pkg/writers/row/writer_map.go | 72 ++++++++ pkg/writers/row/writer_map_test.go | 1 + pkg/writers/row/writer_slice.go | 81 +++++++++ pkg/writers/row_gen/go.mod | 3 + pkg/writers/row_gen/go.sum | 0 pkg/writers/row_gen/writer_ref_slice.go | 98 +++++++++++ pkg/writers/row_gen/writer_ref_slice_ref.go | 101 +++++++++++ pkg/writers/solo_row/interface.go | 5 + pkg/writers/solo_row/solo_ref.go | 51 ++++++ session.go | 53 ++++++ 26 files changed, 1627 insertions(+) create mode 100644 pkg/writers/interface.go create mode 100644 pkg/writers/row/bench_anti_panic_test.go create mode 100644 pkg/writers/row/bench_map_test.go create mode 100644 pkg/writers/row/bench_ref_map_test.go create mode 100644 pkg/writers/row/bench_slice_refs_test.go create mode 100644 pkg/writers/row/bench_slice_test.go create mode 100644 pkg/writers/row/example_map.go create mode 100644 pkg/writers/row/example_ref_slice.go create mode 100644 pkg/writers/row/example_ref_slice_refs.go create mode 100644 pkg/writers/row/example_slice.go create mode 100644 pkg/writers/row/example_slice_refs.go create mode 100644 pkg/writers/row/example_struct.go create mode 100644 pkg/writers/row/utils.go create mode 100644 pkg/writers/row/writer_map.go create mode 100644 pkg/writers/row/writer_map_test.go create mode 100644 pkg/writers/row/writer_slice.go create mode 100644 pkg/writers/row_gen/go.mod create mode 100644 pkg/writers/row_gen/go.sum create mode 100644 pkg/writers/row_gen/writer_ref_slice.go create mode 100644 pkg/writers/row_gen/writer_ref_slice_ref.go create mode 100644 pkg/writers/solo_row/interface.go create mode 100644 pkg/writers/solo_row/solo_ref.go diff --git a/frame.go b/frame.go index 44be7879d..1a6ea1486 100644 --- a/frame.go +++ b/frame.go @@ -14,6 +14,8 @@ import ( "runtime" "strings" "time" + + "github.com/gocql/gocql/pkg/writers" ) type unsetColumn struct{} @@ -478,6 +480,26 @@ func (f *framer) payload() { f.flags |= flagCustomPayload } +func (f *framer) writeRows(r writers.Rows, rows int) error { + r.Prepare(int32(rows)) + read, err := r.WriteRows(f.buf) + f.buf = f.buf[read:] + return err +} + +func (f *framer) writeRow(r writers.Rows) error { + r.Prepare(int32(1)) + read, err := r.WriteRow(f.buf) + f.buf = f.buf[read:] + return err +} + +func (f *framer) writeSolitaryRow(r writers.Row) error { + read, err := r.WriteRow(f.buf) + f.buf = f.buf[read:] + return err +} + // reads a frame form the wire into the framers buffer func (f *framer) readFrame(r io.Reader, head *frameHeader) error { if head.length < 0 { @@ -1748,6 +1770,10 @@ func (f *framer) writeRegisterFrame(streamID int, w *writeRegisterFrame) error { return f.finish() } +func (f *framer) notEmpty() bool { + return len(f.buf) > 0 +} + func (f *framer) readByte() byte { if len(f.buf) < 1 { panic(fmt.Errorf("not enough bytes in buffer to read byte require 1 got: %d", len(f.buf))) diff --git a/go.mod b/go.mod index a18289252..de1878481 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,10 @@ require ( github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/golang/snappy v0.0.3 + github.com/google/uuid v1.3.1 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed github.com/kr/pretty v0.1.0 // indirect + github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.3.0 // indirect gopkg.in/inf.v0 v0.9.1 ) diff --git a/go.sum b/go.sum index 2e3892bcb..eeef4fc3f 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= @@ -13,6 +15,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pkg/writers/interface.go b/pkg/writers/interface.go new file mode 100644 index 000000000..755e8d1a9 --- /dev/null +++ b/pkg/writers/interface.go @@ -0,0 +1,74 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package writers + +import "errors" + +var NAP = errors.New("write operation is completed, but there is still data left") + +type Struct interface { + UnmarshalCQL2(data []byte) int32 +} + +// Rows describes interaction between callers and Rows`s designed for row by row write. +// Rows useful if expect response with undefined rows count and callers fill the data gradual. +// Callers must call call Prepare(n) before call WriteRow n times and/or before WriteRows call. +// Implementations must not modify the slice data, even temporarily and not retain data. +type Rows interface { + // Prepare helps inform Rows`s about count of rows that callers have. So callers can help Rows`s have better resource pre allocation. + Prepare(rows int32) + // WriteRow write a single row. + // If write is done and len(data)>w must return error=nil. + // If data is not enough to write a row must return io.ErrUnexpectedEOF + WriteRow(data []byte) (w int32, err error) + // WriteRows write all rows from data. + // If write is not done or is partial done because data not enough must return io.ErrUnexpectedEOF + WriteRows(data []byte) (w int32, err error) + ColumnsInRow() int +} + +// Row describes interaction between callers and writers designed for solitary row write. +// Row useful if expect response with rows=1 or 0. +// Implementations must not modify the slice data, even temporarily and not retain data. +type Row interface { + // WriteRow write a single row. + // If write is done and len(data)>w must return error=nil. + // If data is not enough to write a row must return io.ErrUnexpectedEOF + WriteRow(data []byte) (write int32, err error) + ColumnsInRow() int +} + +// Slice describes interaction between Rows and go slices (actually structures with slices). +// Rows must raise code from any panic due UnmarshalElem calls and call OnElemError. +// Implementations must not modify the slice data, even temporarily and not retain data. +// On wrong data cases implementations must use panic with detailed description. +// Implementations must independently monitor the sufficiency of the slice length. +type Slice interface { + // Init same as ...make(slice,count). + Init(count int32) + // Append same as build in append func. + Append(count int32) + // UnmarshalElem writes data to slice elem, returns write bites count. + UnmarshalElem(data []byte) int32 + // NextElem switch Slice to next elem. + NextElem() + // OnElemError helps ToSliceWriter tell to Slice about panic due UnmarshalElem call. + OnElemError() + // CutOnDone helps Slice to free pre allocated resources due all Expand and Init calls. + CutOnDone() + // Len returns current Slice len. + Len() int +} + +// Map describes interaction between Rows and go maps. +// Rows must raise code from any panic due UnmarshalMapElem calls and call OnMapElemError. +// Implementations must not modify the slice data, even temporarily and not retain data. +// On wrong data cases implementations must use panic with detailed description. +type Map interface { + // UnmarshalMapElem writes data to map elem, returns write bites count. + UnmarshalMapElem(data []byte) int32 + // OnMapElemError helps ToMapWriter tell to Map about panic due UnmarshalMapElem call. + OnMapElemError() +} diff --git a/pkg/writers/row/bench_anti_panic_test.go b/pkg/writers/row/bench_anti_panic_test.go new file mode 100644 index 000000000..f2637a3a4 --- /dev/null +++ b/pkg/writers/row/bench_anti_panic_test.go @@ -0,0 +1,155 @@ +// nolint +package row + +import ( + "fmt" + "strings" + "testing" +) + +func BenchmarkAntiPanicOrCheckLen_test(b *testing.B) { + bytesLen := 50000000 + data := randBytes(bytesLen) + name := fmt.Sprintf("framer len %d", bytesLen) + runAntiPanicWarmup(data, b) + runAntiPanicSet(name, data, b) + bytesLen = 500000 + data = randBytes(bytesLen) + name = fmt.Sprintf("framer len %d", bytesLen) + runAntiPanicSet(name, data, b) + bytesLen = 500 + data = randBytes(bytesLen) + name = fmt.Sprintf("framer len %d", bytesLen) + runAntiPanicSet(name, data, b) + bytesLen = 50 + data = randBytes(bytesLen) + name = fmt.Sprintf("framer len %d", bytesLen) + runAntiPanicSet(name, data, b) +} + +func runAntiPanicWarmup(data []byte, b *testing.B) { + b.Run("WARMUP", func(b *testing.B) { + runReadWith("--------warmup---------", read2If4, data, b) + runReadWith("--------warmup---------", read4WithIf10, data, b) + }) +} + +func runAntiPanicSet(setName string, data []byte, b *testing.B) { + b.Run(setName, func(b *testing.B) { + fmt.Println("--------------------------------------", setName, "---------------------------------------") + runReadWith(" read2If4", read2If4, data, b) + runReadWith("read2WithAntiPanic", read2WithAntiPanic, data, b) + runReadWith(" read4WithIf10", read4WithIf10, data, b) + runReadWith("read4WithAntiPanic", read4WithAntiPanic, data, b) + }) +} + +func runReadWith(name string, rFunc func([]byte), data []byte, b *testing.B) { + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + rFunc(data) + } + }) +} + +//go:noinline +func read2If4(data []byte) { + elems := len(data) / 4 + tmp := int16(0) + var err error + for idx := 0; idx < elems; idx++ { + tmp, err = dec2ToInt16Err(data) + if err != nil { + return + } + data = data[2:] + tmp, err = dec2ToInt16Err(data) + if err != nil { + return + } + data = data[2:] + _ = tmp + } +} + +//go:noinline +func read2WithAntiPanic(data []byte) { + defer func() { + if errF := recover(); errF != nil { + if strings.Contains(fmt.Sprintf("%s", errF), "runtime error: index out of range") { + return + } + } + }() + elems := len(data) / 4 + tmp := int16(0) + for idx := 0; idx < elems; idx++ { + tmp = dec2ToInt16(data) + data = data[2:] + tmp = dec2ToInt16(data) + data = data[2:] + _ = tmp + } +} + +//go:noinline +func read4WithIf10(data []byte) { + elems := len(data) / 4 + tmp := int16(0) + var err error + for idx := 0; idx < elems; idx++ { + tmp, err = twiceDec2ToInt16Err(data) + if err != nil { + return + } + data = data[4:] + tmp, err = twiceDec2ToInt16Err(data) + if err != nil { + return + } + data = data[4:] + _ = tmp + } +} + +//go:noinline +func read4WithAntiPanic(data []byte) { + defer func() { + if errF := recover(); errF != nil { + if strings.Contains(fmt.Sprintf("%s", errF), "runtime error: index out of range") { + return + } + } + }() + elems := len(data) / 8 + tmp := int16(0) + for idx := 0; idx < elems; idx++ { + tmp = twiceDec2ToInt16(data) + data = data[4:] + tmp = twiceDec2ToInt16(data) + data = data[4:] + _ = tmp + } +} + +func twiceDec2ToInt16Err(data []byte) (int16, error) { + tmp, err := dec2ToInt16Err(data) + if err != nil { + return 0, err + } + data = data[2:] + tmp, err = dec2ToInt16Err(data) + if err != nil { + return 0, err + } + data = data[2:] + return tmp, nil +} + +func twiceDec2ToInt16(data []byte) int16 { + tmp := dec2ToInt16(data) + data = data[2:] + tmp = dec2ToInt16(data) + data = data[2:] + return tmp +} diff --git a/pkg/writers/row/bench_map_test.go b/pkg/writers/row/bench_map_test.go new file mode 100644 index 000000000..b247301f1 --- /dev/null +++ b/pkg/writers/row/bench_map_test.go @@ -0,0 +1,88 @@ +// nolint +package row + +import ( + "fmt" + "testing" +) + +func BenchmarkRowsMapWriters_test(b *testing.B) { + b.ReportAllocs() + rows := 10000 + data := randInts(rows, 3) + runRowsMapWritersWarmup(data, rows, b) + runRowsMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b) + rows = 1000 + runRowsMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b) + rows = 100 + runRowsMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b) + rows = 5 + runRowsMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b) +} + +func runRowsMapWritersWarmup(data []byte, rows int, b *testing.B) { + b.Run("WARMUP", func(b *testing.B) { + b.Run(" WARMUP", func(b *testing.B) { + for i := 0; i < b.N; i++ { + out := rowsMapDirect(data, rows) + _ = out + } + }) + }) +} + +func runRowsMapWritersSet(setName string, data []byte, rows int, b *testing.B) { + b.Run(setName, func(b *testing.B) { + fmt.Println("--------------------------------------MAP", setName, "MAP---------------------------------------") + b.Run(setName+" rowsMapDirect", func(b *testing.B) { + var out ExampleMap + for i := 0; i < b.N; i++ { + out = rowsMapDirect(data, rows) + } + _ = out + }) + b.Run(setName+" rowsMap", func(b *testing.B) { + var out ExampleMap + for i := 0; i < b.N; i++ { + out = rowsMap(data, rows) + } + _ = out + }) + b.Run(setName+" rowsMapRef", func(b *testing.B) { + var out ExampleMapRefs + for i := 0; i < b.N; i++ { + out = rowsMapRef(data, rows) + } + _ = out + }) + }) +} + +func rowsMapDirect(data []byte, rows int) ExampleMap { + out := make(ExampleMap, rows) + write := int32(0) + for idx := 0; idx < rows; idx++ { + var tmp ExampleStruct + write = tmp.UnmarshalCQL2(data[write:]) + out[tmp.GetID()] = tmp + } + return out +} + +func rowsMap(data []byte, rows int) ExampleMap { + out := make(ExampleMap, rows) + write := int32(0) + for idx := 0; idx < rows; idx++ { + write = out.UnmarshalMapElem(data[write:]) + } + return out +} + +func rowsMapRef(data []byte, rows int) ExampleMapRefs { + out := make(ExampleMapRefs, rows) + write := int32(0) + for idx := 0; idx < rows; idx++ { + write = out.UnmarshalMapElem(data[write:]) + } + return out +} diff --git a/pkg/writers/row/bench_ref_map_test.go b/pkg/writers/row/bench_ref_map_test.go new file mode 100644 index 000000000..f82ba9cb7 --- /dev/null +++ b/pkg/writers/row/bench_ref_map_test.go @@ -0,0 +1,77 @@ +// nolint +package row + +import ( + "fmt" + "testing" +) + +func BenchmarkRowsRefMapWriters_test(b *testing.B) { + b.ReportAllocs() + rows := 10000 + data := randInts(rows, 3) + runRowsMapWritersWarmup(data, rows, b) + runRowsRefMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b) + rows = 1000 + runRowsRefMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b) + rows = 100 + runRowsRefMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b) + rows = 5 + runRowsRefMapWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, b) +} + +func runRowsRefMapWritersSet(setName string, data []byte, rows int, b *testing.B) { + b.Run(setName, func(b *testing.B) { + fmt.Println("--------------------------------------MAP", setName, "MAP---------------------------------------") + b.Run(setName+"rowsRefMapDirect", func(b *testing.B) { + var out ExampleRefMap + for i := 0; i < b.N; i++ { + out = rowsRefMapDirect(data, rows) + } + _ = out + }) + b.Run(setName+" rowsRefMap", func(b *testing.B) { + var out ExampleRefMap + for i := 0; i < b.N; i++ { + out = rowsRefMap(data, rows) + } + _ = out + }) + b.Run(setName+" rowsRefMapRef", func(b *testing.B) { + var out ExamplesRefMapRefs + for i := 0; i < b.N; i++ { + out = rowsRefMapRef(data, rows) + } + _ = out + }) + }) +} + +func rowsRefMapDirect(data []byte, rows int) ExampleRefMap { + out := make(ExampleRefMap, rows) + write := int32(0) + for idx := 0; idx < rows; idx++ { + var tmp ExampleStruct + write = tmp.UnmarshalCQL2(data[write:]) + out[tmp.GetID()] = &tmp + } + return out +} + +func rowsRefMap(data []byte, rows int) ExampleRefMap { + out := make(ExampleRefMap, rows) + write := int32(0) + for idx := 0; idx < rows; idx++ { + write = out.UnmarshalMapElem(data[write:]) + } + return out +} + +func rowsRefMapRef(data []byte, rows int) ExamplesRefMapRefs { + out := make(ExamplesRefMapRefs, rows) + write := int32(0) + for idx := 0; idx < rows; idx++ { + write = out.UnmarshalMapElem(data[write:]) + } + return out +} diff --git a/pkg/writers/row/bench_slice_refs_test.go b/pkg/writers/row/bench_slice_refs_test.go new file mode 100644 index 000000000..dde524d1b --- /dev/null +++ b/pkg/writers/row/bench_slice_refs_test.go @@ -0,0 +1,176 @@ +// nolint +package row + +import ( + "fmt" + "testing" +) + +func BenchmarkSliceRefs_test(b *testing.B) { + b.ReportAllocs() + rows := 10000 + data := randInts(rows, 3) + runSliceRefsWarmup(data, rows, b) + runSliceRefsSet(fmt.Sprintf("rows count %d", rows), data, rows, 100, b) + rows = 1000 + runSliceRefsSet(fmt.Sprintf("rows count %d", rows), data, rows, 10, b) + rows = 100 + runSliceRefsSet(fmt.Sprintf("rows count %d", rows), data, rows, 5, b) + rows = 10 + runSliceRefsSet(fmt.Sprintf("rows count %d", rows), data, rows, 5, b) + rows = 5 + runSliceRefsSet(fmt.Sprintf("rows count %d", rows), data, rows, 1, b) + rows = 2 + runSliceRefsSet(fmt.Sprintf("rows count %d", rows), data, rows, 1, b) +} + +func BenchmarkOnceSliceRefs_test(b *testing.B) { + b.ReportAllocs() + b.N = 1 + rows := 10 + data := randInts(rows, 3) + runSliceRefsSet(fmt.Sprintf("rows count %d", rows), data, rows, 1, b) +} + +func runSliceRefsWarmup(data []byte, rows int, b *testing.B) { + b.Run("WARMUP", func(b *testing.B) { + b.Run(" WARMUP", func(b *testing.B) { + for i := 0; i < b.N; i++ { + out := writeDirectSliceRefs(data, rows, 1) + _ = out + } + }) + }) +} + +func runSliceRefsSet(setName string, data []byte, rows, segments int, b *testing.B) { + b.Run(setName, func(b *testing.B) { + fmt.Println("--------------------------------------SLICE", setName, "SLICE---------------------------------------") + b.Run(setName+" writeDirectSliceRefs", func(b *testing.B) { + var out []*ExampleStruct + for i := 0; i < b.N; i++ { + out = writeDirectSliceRefs(data, rows, segments) + } + _ = out + }) + b.Run(setName+" writeFuncSliceRefs", func(b *testing.B) { + var out []*ExampleStruct + for i := 0; i < b.N; i++ { + out = writeFuncSliceRefs(data, rows, segments) + } + _ = out + }) + b.Run(setName+" writeFuncToSliceRefs", func(b *testing.B) { + var out []*ExampleStruct + for i := 0; i < b.N; i++ { + out = writeFuncToSliceRefs(data, rows, segments) + } + _ = out + }) + b.Run(setName+" writeDirectRefSliceRefs", func(b *testing.B) { + var out []*ExampleStruct + for i := 0; i < b.N; i++ { + out = writeDirectRefSliceRefs(data, rows, segments) + } + _ = out + }) + b.Run(setName+" writeSliceRefsMethod", func(b *testing.B) { + var out []*ExampleStruct + for i := 0; i < b.N; i++ { + out = writeSliceRefsMethod(data, rows, segments) + } + _ = out + }) + b.Run(setName+" writeRefSliceRefsMethod", func(b *testing.B) { + var out []*ExampleStruct + for i := 0; i < b.N; i++ { + out = writeRefSliceRefsMethod(data, rows, segments) + } + _ = out + }) + }) +} + +func writeDirectSliceRefs(data []byte, rows, segments int) []*ExampleStruct { + out := make([]*ExampleStruct, rows/segments) + write := int32(0) + for idx := 0; idx < rows; idx++ { + if len(out) <= idx { + out = append(out, make([]*ExampleStruct, rows/segments)...) + } + out[idx] = &ExampleStruct{} + write = out[idx].UnmarshalCQL2(data[write:]) + } + return out +} + +func writeFuncSliceRefs(data []byte, rows, segments int) []*ExampleStruct { + out := make([]*ExampleStruct, rows/segments) + write := int32(0) + for idx := 0; idx < rows; idx++ { + if len(out) <= idx { + out = append(out, make([]*ExampleStruct, rows/segments)...) + } + out[idx] = &ExampleStruct{} + write = unmarshalToExample(out[idx], data[write:]) + } + return out +} + +func writeFuncToSliceRefs(data []byte, rows, segments int) []*ExampleStruct { + out := make([]*ExampleStruct, rows/segments) + write := int32(0) + for idx := 0; idx < rows; idx++ { + if len(out) <= idx { + out = append(out, make([]*ExampleStruct, rows/segments)...) + } + write, out[idx] = unmarshalRefExample(data[write:]) + } + return out +} + +func writeDirectRefSliceRefs(data []byte, rows, segments int) []*ExampleStruct { + var out *[]*ExampleStruct + tmp := make([]*ExampleStruct, rows/segments) + out = &tmp + write := int32(0) + for idx := 0; idx < rows; idx++ { + if len(*out) <= idx { + *out = append(*out, make([]*ExampleStruct, rows/segments)...) + } + (*out)[idx] = &ExampleStruct{} + write = (*out)[idx].UnmarshalCQL2(data[write:]) + } + return tmp +} + +var sliceRefs ExampleSliceRefs + +func writeSliceRefsMethod(data []byte, rows, segments int) []*ExampleStruct { + sliceRefs.Init(int32(rows / segments)) + write := int32(0) + for idx := 0; idx < rows; idx++ { + if sliceRefs.Len() <= idx { + sliceRefs.Append(int32(rows / segments)) + } + write = sliceRefs.UnmarshalElem(data[write:]) + sliceRefs.NextElem() + } + return sliceRefs.Rows +} + +var refSliceRefsM ExampleRefSliceRefs + +func writeRefSliceRefsMethod(data []byte, rows, segments int) []*ExampleStruct { + out := make([]*ExampleStruct, rows/segments) + refSliceRefsM.ReUse(&out) + write := int32(0) + for idx := 0; idx < rows; idx++ { + if refSliceRefsM.Len() <= idx { + refSliceRefsM.Append(int32(rows / segments)) + } + write = refSliceRefsM.UnmarshalElem(data[write:]) + refSliceRefsM.NextElem() + } + return out +} diff --git a/pkg/writers/row/bench_slice_test.go b/pkg/writers/row/bench_slice_test.go new file mode 100644 index 000000000..cd437a66b --- /dev/null +++ b/pkg/writers/row/bench_slice_test.go @@ -0,0 +1,173 @@ +// nolint +package row + +import ( + "fmt" + "testing" +) + +func BenchmarkRowsRefSliceWriters_test(b *testing.B) { + b.ReportAllocs() + rows := 10000 + data := randInts(rows, 3) + runRowsWritersWarmup(data, rows, b) + runRowsRefSliceWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, 100, b) + rows = 1000 + runRowsRefSliceWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, 10, b) + rows = 100 + runRowsRefSliceWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, 5, b) + rows = 10 + runRowsRefSliceWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, 5, b) + rows = 5 + runRowsRefSliceWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, 1, b) + rows = 2 + runRowsRefSliceWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, 1, b) +} + +func BenchmarkOnceSliceWriters_test(b *testing.B) { + b.ReportAllocs() + b.N = 1 + rows := 10 + data := randInts(rows, 3) + runRowsRefSliceWritersSet(fmt.Sprintf("rows count %d", rows), data, rows, 1, b) +} + +func runRowsWritersWarmup(data []byte, rows int, b *testing.B) { + b.Run("WARMUP", func(b *testing.B) { + b.Run(" WARMUP", func(b *testing.B) { + for i := 0; i < b.N; i++ { + out := writeDirectSlice(data, rows, 1) + _ = out + } + }) + }) +} + +func runRowsRefSliceWritersSet(setName string, data []byte, rows, segments int, b *testing.B) { + b.Run(setName, func(b *testing.B) { + fmt.Println("--------------------------------------SLICE", setName, "SLICE---------------------------------------") + b.Run(setName+" writeDirectSlice", func(b *testing.B) { + var out []ExampleStruct + for i := 0; i < b.N; i++ { + out = writeDirectSlice(data, rows, segments) + } + _ = out + }) + b.Run(setName+" writeFuncSlice", func(b *testing.B) { + var out []ExampleStruct + for i := 0; i < b.N; i++ { + out = writeFuncSlice(data, rows, segments) + } + _ = out + }) + b.Run(setName+"writeFuncToSlice", func(b *testing.B) { + var out []ExampleStruct + for i := 0; i < b.N; i++ { + out = writeFuncToSlice(data, rows, segments) + } + _ = out + }) + b.Run(setName+" writeSliceMethod", func(b *testing.B) { + var out []ExampleStruct + for i := 0; i < b.N; i++ { + out = writeSliceMethod(data, rows, segments) + } + _ = out + }) + b.Run(setName+" writeDirectRefSlice", func(b *testing.B) { + var out []ExampleStruct + for i := 0; i < b.N; i++ { + out = writeDirectRefSlice(data, rows, segments) + } + _ = out + }) + b.Run(setName+" writeRefSliceMethod", func(b *testing.B) { + var out []ExampleStruct + for i := 0; i < b.N; i++ { + out = writeRefSliceMethod(data, rows, segments) + } + _ = out + }) + }) +} + +func writeDirectSlice(data []byte, rows, segments int) []ExampleStruct { + out := make([]ExampleStruct, rows/segments) + write := int32(0) + for idx := 0; idx < rows; idx++ { + if len(out) <= idx { + out = append(out, make([]ExampleStruct, rows/segments)...) + } + write = out[idx].UnmarshalCQL2(data[write:]) + } + return out +} + +func writeFuncSlice(data []byte, rows, segments int) []ExampleStruct { + out := make([]ExampleStruct, rows/segments) + write := int32(0) + for idx := 0; idx < rows; idx++ { + if len(out) <= idx { + out = append(out, make([]ExampleStruct, rows/segments)...) + } + write, out[idx] = unmarshalExample(data[write:]) + } + return out +} + +func writeFuncToSlice(data []byte, rows, segments int) []ExampleStruct { + out := make([]ExampleStruct, rows/segments) + write := int32(0) + for idx := 0; idx < rows; idx++ { + if len(out) <= idx { + out = append(out, make([]ExampleStruct, rows/segments)...) + } + write = unmarshalToExample(&out[idx], data[write:]) + } + return out +} + +var slice ExampleSlice + +func writeDirectRefSlice(data []byte, rows, segments int) []ExampleStruct { + var refSlice *[]ExampleStruct + tmp := make([]ExampleStruct, rows/segments) + refSlice = &tmp + write := int32(0) + for idx := 0; idx < rows; idx++ { + if len(*refSlice) <= idx { + *refSlice = append(*refSlice, make([]ExampleStruct, rows/segments)...) + } + write = (*refSlice)[idx].UnmarshalCQL2(data[write:]) + } + return tmp +} + +func writeSliceMethod(data []byte, rows, segments int) []ExampleStruct { + slice.Init(int32(rows / segments)) + write := int32(0) + for idx := 0; idx < rows; idx++ { + if slice.Len() <= idx { + slice.Append(int32(rows / segments)) + } + write = slice.UnmarshalElem(data[write:]) + slice.NextElem() + } + return slice.Rows +} + +var refSliceM ExampleRefSlice + +func writeRefSliceMethod(data []byte, rows, segments int) []ExampleStruct { + out := make([]ExampleStruct, rows/segments) + refSliceM.ReUse(&out) + write := int32(0) + for idx := 0; idx < rows; idx++ { + if refSliceM.Len() <= idx { + refSliceM.Append(int32(rows / segments)) + } + write = refSliceM.UnmarshalElem(data[write:]) + refSliceM.NextElem() + } + return out +} diff --git a/pkg/writers/row/example_map.go b/pkg/writers/row/example_map.go new file mode 100644 index 000000000..d62e5d3a2 --- /dev/null +++ b/pkg/writers/row/example_map.go @@ -0,0 +1,52 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package row + +type ExampleMap map[int32]ExampleStruct + +func (s ExampleMap) UnmarshalMapElem(data []byte) int32 { + tmp := ExampleStruct{} + write := tmp.UnmarshalCQL2(data) + s[tmp.GetID()] = tmp + return write +} + +func (s ExampleMap) OnMapElemError() { +} + +type ExampleMapRefs map[int32]ExampleStruct + +func (s *ExampleMapRefs) UnmarshalMapElem(data []byte) int32 { + tmp := ExampleStruct{} + write := tmp.UnmarshalCQL2(data) + (*s)[tmp.GetID()] = tmp + return write +} + +func (s *ExampleMapRefs) OnMapElemError() { +} + +type ExampleRefMap map[int32]*ExampleStruct + +func (s ExampleRefMap) UnmarshalMapElem(data []byte) int32 { + tmp := ExampleStruct{} + write := tmp.UnmarshalCQL2(data) + s[tmp.GetID()] = &tmp + return write +} + +func (s ExampleRefMap) OnMapElemError() { +} + +type ExamplesRefMapRefs map[int32]*ExampleStruct + +func (s *ExamplesRefMapRefs) UnmarshalMapElem(data []byte) int32 { + tmp := ExampleStruct{} + write := tmp.UnmarshalCQL2(data) + (*s)[tmp.GetID()] = &tmp + return write +} +func (s *ExamplesRefMapRefs) OnMapElemError() { +} diff --git a/pkg/writers/row/example_ref_slice.go b/pkg/writers/row/example_ref_slice.go new file mode 100644 index 000000000..57b7a5656 --- /dev/null +++ b/pkg/writers/row/example_ref_slice.go @@ -0,0 +1,50 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package row + +// ExampleRefSlice represent example implemented Slice interface with processing by reference of slice for slice. +// Comparison of slice processing in various ways you can see in bench_slice_test.go. +type ExampleRefSlice struct { + rows *[]ExampleStruct + rowIdx int +} + +func (s *ExampleRefSlice) ReUse(rows *[]ExampleStruct) { + s.rowIdx = 0 + s.rows = rows +} + +func (s *ExampleRefSlice) Init(count int32) { + *s.rows = make([]ExampleStruct, count) +} + +func (s *ExampleRefSlice) Append(count int32) { + *s.rows = append(*s.rows, make([]ExampleStruct, count)...) +} + +func (s *ExampleRefSlice) UnmarshalElem(data []byte) int32 { + if s.rowIdx > s.Len()-1 { + s.Append(1) + } + return (*s.rows)[s.rowIdx].UnmarshalCQL2(data) +} + +func (s *ExampleRefSlice) NextElem() { + s.rowIdx++ +} + +func (s *ExampleRefSlice) OnElemError() { + (*s.rows)[s.rowIdx] = *new(ExampleStruct) +} + +func (s *ExampleRefSlice) CutOnDone() { + if s.rowIdx+1 < s.Len() { + *s.rows = (*s.rows)[:s.rowIdx+1] + } +} + +func (s *ExampleRefSlice) Len() int { + return len(*s.rows) +} diff --git a/pkg/writers/row/example_ref_slice_refs.go b/pkg/writers/row/example_ref_slice_refs.go new file mode 100644 index 000000000..42dc9a7c6 --- /dev/null +++ b/pkg/writers/row/example_ref_slice_refs.go @@ -0,0 +1,51 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package row + +// ExampleRefSliceRefs represent example implemented Slice interface with processing by reference of slice for slice of references. +// Comparison of slice processing in various ways you can see in bench_slice_test.go. +type ExampleRefSliceRefs struct { + rows *[]*ExampleStruct + rowIdx int +} + +func (s *ExampleRefSliceRefs) ReUse(rows *[]*ExampleStruct) { + s.rowIdx = 0 + s.rows = rows +} + +func (s *ExampleRefSliceRefs) Init(count int32) { + *s.rows = make([]*ExampleStruct, count) +} + +func (s *ExampleRefSliceRefs) Append(count int32) { + *s.rows = append(*s.rows, make([]*ExampleStruct, count)...) +} + +func (s *ExampleRefSliceRefs) UnmarshalElem(data []byte) int32 { + if s.rowIdx > s.Len()-1 { + s.Append(1) + } + (*s.rows)[s.rowIdx] = &ExampleStruct{} + return (*s.rows)[s.rowIdx].UnmarshalCQL2(data) +} + +func (s *ExampleRefSliceRefs) NextElem() { + s.rowIdx++ +} + +func (s *ExampleRefSliceRefs) OnElemError() { + (*s.rows)[s.rowIdx] = new(ExampleStruct) +} + +func (s *ExampleRefSliceRefs) CutOnDone() { + if s.rowIdx+1 < s.Len() { + *s.rows = (*s.rows)[:s.rowIdx+1] + } +} + +func (s *ExampleRefSliceRefs) Len() int { + return len(*s.rows) +} diff --git a/pkg/writers/row/example_slice.go b/pkg/writers/row/example_slice.go new file mode 100644 index 000000000..e54043539 --- /dev/null +++ b/pkg/writers/row/example_slice.go @@ -0,0 +1,46 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package row + +// ExampleSlice represent example implemented Slice interface with direct slice processing for slice. +// Comparison of slice processing in various ways you can see in bench_slice_test.go. +type ExampleSlice struct { + Rows []ExampleStruct + rowIdx int +} + +func (s *ExampleSlice) Init(count int32) { + s.rowIdx = 0 + s.Rows = make([]ExampleStruct, count) +} + +func (s *ExampleSlice) Append(count int32) { + s.Rows = append(s.Rows, make([]ExampleStruct, count)...) +} + +func (s *ExampleSlice) UnmarshalElem(data []byte) int32 { + if s.rowIdx > s.Len()-1 { + s.Append(1) + } + return s.Rows[s.rowIdx].UnmarshalCQL2(data) +} + +func (s *ExampleSlice) NextElem() { + s.rowIdx++ +} + +func (s *ExampleSlice) OnElemError() { + s.Rows[s.rowIdx] = *new(ExampleStruct) +} + +func (s *ExampleSlice) CutOnDone() { + if s.rowIdx+1 < s.Len() { + s.Rows = s.Rows[:s.rowIdx+1] + } +} + +func (s *ExampleSlice) Len() int { + return len(s.Rows) +} diff --git a/pkg/writers/row/example_slice_refs.go b/pkg/writers/row/example_slice_refs.go new file mode 100644 index 000000000..63e6534a2 --- /dev/null +++ b/pkg/writers/row/example_slice_refs.go @@ -0,0 +1,47 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package row + +// ExampleSliceRefs represent example implemented Slice interface with direct slice processing for slice of references. +// Comparison of slice processing in various ways you can see in bench_slice_test.go. +type ExampleSliceRefs struct { + Rows []*ExampleStruct + rowIdx int +} + +func (s *ExampleSliceRefs) Init(count int32) { + s.rowIdx = 0 + s.Rows = make([]*ExampleStruct, count) +} + +func (s *ExampleSliceRefs) Append(count int32) { + s.Rows = append(s.Rows, make([]*ExampleStruct, count)...) +} + +func (s *ExampleSliceRefs) UnmarshalElem(data []byte) int32 { + if s.rowIdx > s.Len()-1 { + s.Append(1) + } + s.Rows[s.rowIdx] = &ExampleStruct{} + return s.Rows[s.rowIdx].UnmarshalCQL2(data) +} + +func (s *ExampleSliceRefs) NextElem() { + s.rowIdx++ +} + +func (s *ExampleSliceRefs) OnElemError() { + *s.Rows[s.rowIdx] = *new(ExampleStruct) +} + +func (s *ExampleSliceRefs) CutOnDone() { + if s.rowIdx+1 < s.Len() { + s.Rows = s.Rows[:s.rowIdx+1] + } +} + +func (s *ExampleSliceRefs) Len() int { + return len(s.Rows) +} diff --git a/pkg/writers/row/example_struct.go b/pkg/writers/row/example_struct.go new file mode 100644 index 000000000..9e8202cc7 --- /dev/null +++ b/pkg/writers/row/example_struct.go @@ -0,0 +1,81 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package row + +import ( + "fmt" +) + +type ExampleStruct struct { + val0 int32 + val1 int32 + val2 int32 +} + +func (e *ExampleStruct) UnmarshalCQL2(data []byte) int32 { + write := int32(0) + elemLen := int32(0) + e.val0, write = decodeIntToInt32(data[:8]) + e.val1, elemLen = decodeIntToInt32(data[write : write+8]) + write += elemLen + e.val2, elemLen = decodeIntToInt32(data[write : write+8]) + write += elemLen + return write +} + +func unmarshalToExample(e *ExampleStruct, data []byte) int32 { + write := int32(0) + elemLen := int32(0) + e.val0, write = decodeIntToInt32(data[:8]) + e.val1, elemLen = decodeIntToInt32(data[write : write+8]) + write += elemLen + e.val2, elemLen = decodeIntToInt32(data[write : write+8]) + write += elemLen + return write +} + +func unmarshalExample(data []byte) (int32, ExampleStruct) { + out := ExampleStruct{} + write := int32(0) + elemLen := int32(0) + out.val0, write = decodeIntToInt32(data[:8]) + out.val1, elemLen = decodeIntToInt32(data[write : write+8]) + write += elemLen + out.val2, elemLen = decodeIntToInt32(data[write : write+8]) + write += elemLen + return write, out +} + +func unmarshalRefExample(data []byte) (int32, *ExampleStruct) { + out := ExampleStruct{} + write := int32(0) + elemLen := int32(0) + out.val0, write = decodeIntToInt32(data[:8]) + out.val1, elemLen = decodeIntToInt32(data[write : write+8]) + write += elemLen + out.val2, elemLen = decodeIntToInt32(data[write : write+8]) + write += elemLen + return write, &out +} + +func (e *ExampleStruct) GetID() int32 { + return e.val0 +} + +func decodeIntToInt32(data []byte) (out int32, read int32) { + valueLen := dec4ToInt32(data[:4]) + switch { + case valueLen == 4: + return dec4ToInt32(data[4:8]), 8 + case valueLen <= 0: + return int32(0), 4 + default: + panic(fmt.Sprintf("wrong value len type should:4 have:%d, data:%v", valueLen, data[:8])) + } +} + +func dec4ToInt32(data []byte) int32 { + return int32(data[0])<<24 | int32(data[1])<<16 | int32(data[2])<<8 | int32(data[3]) +} diff --git a/pkg/writers/row/utils.go b/pkg/writers/row/utils.go new file mode 100644 index 000000000..9b6dc9968 --- /dev/null +++ b/pkg/writers/row/utils.go @@ -0,0 +1,60 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package row + +import ( + "github.com/pkg/errors" + "math/rand" + "time" +) + +func randInts(rows, columns int) []byte { + out := make([]byte, 0, rows*columns*8) + for i := 0; i < rows*3; i++ { + out = append(out, randInt32(rnd, 2147483647, -2147483648)...) + } + return out +} + +func randInt32(rnd *rand.Rand, max, min int32) []byte { + t := rnd.Int31n(4) + if t == 0 { + return []byte{255, 255, 255, 255} + } + if t == 1 { + return []byte{0, 0, 0, 0} + } + if max == min { + t = max + } else { + if min < 0 { + t = rnd.Int31n(max) + min + } else { + t = rnd.Int31n(max-min) + min + } + } + return append([]byte{0, 0, 0, 4}, []byte{byte(t >> 24), byte(t >> 16), byte(t >> 8), byte(t)}...) +} + +func dec2ToInt16Err(data []byte) (int16, error) { + if len(data) < 2 { + return 0, errors.New("low len") + } + return int16(data[0])<<8 | int16(data[1]), nil +} + +func dec2ToInt16(data []byte) int16 { + return int16(data[0])<<8 | int16(data[1]) +} + +func randBytes(bytesLen int) []byte { + out := make([]byte, bytesLen) + for idx := range out { + out[idx] = byte(rnd.Intn(256)) + } + return out +} + +var rnd = rand.New(rand.NewSource(time.Now().UnixNano())) diff --git a/pkg/writers/row/writer_map.go b/pkg/writers/row/writer_map.go new file mode 100644 index 000000000..89f38fed1 --- /dev/null +++ b/pkg/writers/row/writer_map.go @@ -0,0 +1,72 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package row + +import ( + "fmt" + "github.com/gocql/gocql/pkg/writers" + "io" + "strings" +) + +func InitToMapWriter(rows writers.Map, columnsInRow int) writers.Rows { + return &ToMapWriter{ + rows: rows, + columns: columnsInRow, + } +} + +// ToMapWriter it`s implementation of writers.Rows interface, designed for write rows to the Map`s. +type ToMapWriter struct { + rows writers.Map + columns int +} + +func (r *ToMapWriter) ColumnsInRow() int { + return r.columns +} + +func (r *ToMapWriter) Prepare(_ int32) { +} + +func (r *ToMapWriter) ReUse(rows writers.Map) { + r.rows = rows +} + +func (r *ToMapWriter) WriteRow(data []byte) (write int32, err error) { + // In usual case don`t need to check data on every read operation, because if unmarshalers already tested and data r right we should just read. + // In case wrong data or wrong unmarshaler will result to wrong all rows in response, that`s why we can just catch panic once and make error. + defer func() { + if errF := recover(); errF != nil { + if strings.Contains(fmt.Sprintf("%T", errF), "runtime.boundsError") { + err = io.ErrUnexpectedEOF + } else { + err = fmt.Errorf("%s", errF) + } + r.rows.OnMapElemError() + } + }() + write = r.rows.UnmarshalMapElem(data) + return +} + +func (r *ToMapWriter) WriteRows(data []byte) (write int32, err error) { + // In usual case don`t need to check data on every read operation, because if unmarshalers already tested and data r right we should just read. + // In case wrong data or wrong unmarshaler will result to wrong all rows in response, that`s why we can just catch panic once and make error. + defer func() { + if errF := recover(); errF != nil { + if strings.Contains(fmt.Sprintf("%T", errF), "runtime.boundsError") { + err = io.ErrUnexpectedEOF + } else { + err = fmt.Errorf("%s", errF) + } + r.rows.OnMapElemError() + } + }() + for int(write) < len(data) { + write += r.rows.UnmarshalMapElem(data[write:]) + } + return +} diff --git a/pkg/writers/row/writer_map_test.go b/pkg/writers/row/writer_map_test.go new file mode 100644 index 000000000..7c476241c --- /dev/null +++ b/pkg/writers/row/writer_map_test.go @@ -0,0 +1 @@ +package row diff --git a/pkg/writers/row/writer_slice.go b/pkg/writers/row/writer_slice.go new file mode 100644 index 000000000..304628cdc --- /dev/null +++ b/pkg/writers/row/writer_slice.go @@ -0,0 +1,81 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package row + +import ( + "fmt" + "github.com/gocql/gocql/pkg/writers" + "io" + "strings" +) + +func InitToSliceWriter(rows writers.Slice, columnsInRow int) ToSliceWriter { + return ToSliceWriter{ + rows: rows, + columns: columnsInRow, + } +} + +// ToSliceWriter it`s implementation of writers.Rows interface, designed for write rows to the Slice`s. +type ToSliceWriter struct { + rows writers.Slice + columns int +} + +func (r *ToSliceWriter) ColumnsInRow() int { + return r.columns +} + +func (r *ToSliceWriter) ReUse(rows writers.Slice) { + r.rows = rows +} + +func (r *ToSliceWriter) Prepare(rows int32) { + if rows == 0 { + return + } + if r.rows.Len() < 1 { + r.rows.Init(rows) + } + r.rows.Append(rows) +} + +func (r *ToSliceWriter) WriteRow(data []byte) (write int32, err error) { + // In usual case don`t need to check data on every read operation, because if unmarshalers already tested and data r right we should just read. + // In case wrong data or wrong unmarshaler will result to wrong all rows in response, that`s why we can just catch panic once and make error. + defer func() { + if errF := recover(); errF != nil { + if strings.Contains(fmt.Sprintf("%T", errF), "runtime.boundsError") { + err = io.ErrUnexpectedEOF + } else { + err = fmt.Errorf("%s", errF) + } + r.rows.OnElemError() + } + }() + write = r.rows.UnmarshalElem(data) + r.rows.NextElem() + return +} + +func (r *ToSliceWriter) WriteRows(data []byte) (write int32, err error) { + // In usual case don`t need to check data on every read operation, because if unmarshalers already tested and data r right we should just read. + // In case wrong data or wrong unmarshaler will result to wrong all rows in response, that`s why we can just catch panic once and make error. + defer func() { + if errF := recover(); errF != nil { + if strings.Contains(fmt.Sprintf("%T", errF), "runtime.boundsError") { + err = io.ErrUnexpectedEOF + } else { + err = fmt.Errorf("%s", errF) + } + r.rows.OnElemError() + } + }() + for int(write) < len(data) { + write += r.rows.UnmarshalElem(data[write:]) + r.rows.NextElem() + } + return +} diff --git a/pkg/writers/row_gen/go.mod b/pkg/writers/row_gen/go.mod new file mode 100644 index 000000000..7d2bb76b2 --- /dev/null +++ b/pkg/writers/row_gen/go.mod @@ -0,0 +1,3 @@ +module github.com/gocql/pkg/writers/row_gen + +go 1.18 diff --git a/pkg/writers/row_gen/go.sum b/pkg/writers/row_gen/go.sum new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/writers/row_gen/writer_ref_slice.go b/pkg/writers/row_gen/writer_ref_slice.go new file mode 100644 index 000000000..75db152b2 --- /dev/null +++ b/pkg/writers/row_gen/writer_ref_slice.go @@ -0,0 +1,98 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package row_gen + +import ( + "fmt" + "io" + "strings" +) + +type WriteToFunc[V any] func(val *V, data []byte) int32 + +func InitRefSliceWriter[V any](rows *[]V, writer WriteToFunc[V], columnsInRow int) ToSliceWriter[V] { + return ToSliceWriter[V]{ + write: writer, + rows: rows, + columns: columnsInRow, + } +} + +// ToSliceWriter it`s implementation of writers.RowsWriter interface, designed for write rows to the go slices. +type ToSliceWriter[V any] struct { + write WriteToFunc[V] + rows *[]V + rowIdx int + columns int +} + +func (r *ToSliceWriter[V]) ColumnsInRow() int { + return r.columns +} + +func (r *ToSliceWriter[V]) Prepare(rows int32) { + if rows == 0 { + return + } + if len(*r.rows) < 1 { + *r.rows = make([]V, rows) + } + *r.rows = append(*r.rows, make([]V, rows)...) +} + +func (r *ToSliceWriter[V]) CutOnDone() { + if r.rowIdx+1 < len(*r.rows) { + *r.rows = (*r.rows)[:r.rowIdx+1] + } +} + +func (r *ToSliceWriter[V]) ReUse(rows *[]V) { + r.rows = rows + r.rowIdx = 0 +} + +func (r *ToSliceWriter[V]) WriteRow(data []byte) (write int32, err error) { + // In usual case don`t need to check data on every read operation, because if unmarshalers already tested and data r right we should just read. + // In case wrong data or wrong unmarshaler will result to wrong all rows in response, that`s why we can just catch panic once and make error. + defer func() { + if errF := recover(); errF != nil { + if strings.Contains(fmt.Sprintf("%T", errF), "runtime.boundsError") { + err = io.ErrUnexpectedEOF + } else { + err = fmt.Errorf("%s", errF) + } + (*r.rows)[r.rowIdx] = *new(V) + } + }() + if r.rowIdx > len(*r.rows)-1 { + *r.rows = append(*r.rows, *new(V)) + } + write = r.write(&(*r.rows)[r.rowIdx], data) + r.rowIdx++ + return +} + +func (r *ToSliceWriter[V]) WriteRows(data []byte) (write int32, err error) { + // In usual case don`t need to check data on every read operation, because if unmarshalers already tested and data r right we should just read. + // In case wrong data or wrong unmarshaler will result to wrong all rows in response, that`s why we can just catch panic once and make error. + defer func() { + if errF := recover(); errF != nil { + if strings.Contains(fmt.Sprintf("%T", errF), "runtime.boundsError") { + err = io.ErrUnexpectedEOF + } else { + err = fmt.Errorf("%s", errF) + } + (*r.rows)[r.rowIdx] = *new(V) + } + }() + for int(write) < len(data) { + if r.rowIdx > len(*r.rows)-1 { + *r.rows = append(*r.rows, *new(V)) + } + write += r.write(&(*r.rows)[r.rowIdx], data[write:]) + r.rowIdx++ + } + return +} diff --git a/pkg/writers/row_gen/writer_ref_slice_ref.go b/pkg/writers/row_gen/writer_ref_slice_ref.go new file mode 100644 index 000000000..9068cd943 --- /dev/null +++ b/pkg/writers/row_gen/writer_ref_slice_ref.go @@ -0,0 +1,101 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package row_gen + +import ( + "fmt" + "io" + "strings" +) + +type InitFunc[V any] func() V + +func InitRefSliceRefsWriter[V any](rows *[]*V, writer WriteToFunc[V], columnsInRow int) ToSliceRefsWriter[V] { + return ToSliceRefsWriter[V]{ + write: writer, + rows: rows, + columns: columnsInRow, + } +} + +// ToSliceRefsWriter it`s implementation of writers.RowsWriter interface, designed for write rows to the go slices of references. +type ToSliceRefsWriter[V any] struct { + write WriteToFunc[V] + rows *[]*V + rowIdx int + minRowLen int + columns int +} + +func (r *ToSliceRefsWriter[V]) ColumnsInRow() int { + return r.columns +} + +func (r *ToSliceRefsWriter[V]) Prepare(rows int32) { + if rows == 0 { + return + } + if len(*r.rows) < 1 { + *r.rows = make([]*V, rows) + } + *r.rows = append(*r.rows, make([]*V, rows)...) +} + +func (r *ToSliceRefsWriter[V]) CutOnDone() { + if r.rowIdx+1 < len(*r.rows) { + *r.rows = (*r.rows)[:r.rowIdx+1] + } +} + +func (r *ToSliceRefsWriter[V]) ReUse(rows *[]*V) { + r.rows = rows + r.rowIdx = 0 +} + +func (r *ToSliceRefsWriter[V]) WriteRow(data []byte) (write int32, err error) { + // In usual case don`t need to check data on every read operation, because if unmarshalers already tested and data r right we should just read. + // In case wrong data or wrong unmarshaler will result to wrong all rows in response, that`s why we can just catch panic once and make error. + defer func() { + if errF := recover(); errF != nil { + if strings.Contains(fmt.Sprintf("%T", errF), "runtime.boundsError") { + err = io.ErrUnexpectedEOF + } else { + err = fmt.Errorf("%s", errF) + } + } + }() + if r.rowIdx > len(*r.rows)-1 { + *r.rows = append(*r.rows, new(V)) + } else { + (*r.rows)[r.rowIdx] = new(V) + } + write = r.write((*r.rows)[r.rowIdx], data) + r.rowIdx++ + return +} + +func (r *ToSliceRefsWriter[V]) WriteRows(data []byte) (write int32, err error) { + // In usual case don`t need to check data on every read operation, because if unmarshalers already tested and data r right we should just read. + // In case wrong data or wrong unmarshaler will result to wrong all rows in response, that`s why we can just catch panic once and make error. + defer func() { + if errF := recover(); errF != nil { + if strings.Contains(fmt.Sprintf("%T", errF), "runtime.boundsError") { + err = io.ErrUnexpectedEOF + } else { + err = fmt.Errorf("%s", errF) + } + } + }() + for int(write) < len(data) { + if r.rowIdx > len(*r.rows)-1 { + *r.rows = append(*r.rows, new(V)) + } else { + (*r.rows)[r.rowIdx] = new(V) + } + write += r.write((*r.rows)[r.rowIdx], data[write:]) + r.rowIdx++ + } + return +} diff --git a/pkg/writers/solo_row/interface.go b/pkg/writers/solo_row/interface.go new file mode 100644 index 000000000..17d81cf10 --- /dev/null +++ b/pkg/writers/solo_row/interface.go @@ -0,0 +1,5 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package solo_row diff --git a/pkg/writers/solo_row/solo_ref.go b/pkg/writers/solo_row/solo_ref.go new file mode 100644 index 000000000..0e6cc6c22 --- /dev/null +++ b/pkg/writers/solo_row/solo_ref.go @@ -0,0 +1,51 @@ +// Copyright (c) 2012 The gocql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package solo_row + +import ( + "fmt" + "github.com/gocql/gocql/pkg/writers" + "io" + "strings" +) + +func InitSoloRefWriter(rowRef writers.Struct, columnsInRow int) ToStruct { + return ToStruct{ + row: rowRef, + columns: columnsInRow, + } +} + +// ToStruct optimized for responses with only one row. +// implement Row +type ToStruct struct { + row writers.Struct + columns int +} + +func (r *ToStruct) ColumnsInRow() int { + return r.columns +} + +// ReUse puts new reference into scanner. Useful for scanner reuse. +func (r *ToStruct) ReUse(rowRef writers.Struct) { + r.row = rowRef +} + +func (r *ToStruct) WriteRow(data []byte) (write int32, err error) { + // In usual case don`t need to check data on every read operation, because if unmarshalers already tested and data r right we should just read. + // In case wrong data or wrong unmarshaler will result to wrong all rows in response, that`s why we can just catch panic once and make error. + defer func() { + if errF := recover(); errF != nil { + if strings.Contains(fmt.Sprintf("%T", errF), "runtime.boundsError") { + err = io.ErrUnexpectedEOF + } else { + err = fmt.Errorf("%s", errF) + } + } + }() + write = r.row.UnmarshalCQL2(data) + return +} diff --git a/session.go b/session.go index 0eac4cf0e..06a653ab6 100644 --- a/session.go +++ b/session.go @@ -19,6 +19,7 @@ import ( "unicode" "github.com/gocql/gocql/internal/lru" + "github.com/gocql/gocql/pkg/writers" ) // Session is the interface used by users to interact with the database. @@ -1418,6 +1419,58 @@ type Iter struct { closed int32 } +func (iter *Iter) WriteRows(writer writers.Rows) error { + if iter.err != nil { + return iter.Close() + } + if iter.meta.colCount != writer.ColumnsInRow() { + return fmt.Errorf("WriteRows error. columns count in response %d, %T designed for %d columns", iter.meta.colCount, writer, writer.ColumnsInRow()) + } + var err error + if iter.numRows == 1 { + err = iter.framer.writeRow(writer) + } else { + err = iter.framer.writeRows(writer, iter.numRows) + } + if err != nil { + iter.err = err + return iter.Close() + } + if iter.framer.notEmpty() { + return io.ErrUnexpectedEOF + } + if iter.next != nil { + next := iter.next.fetch() + if next.err != nil { + return next.Close() + } + err = next.WriteRows(writer) + if err != nil { + next.err = err + } + iter.err = next.Close() + } + return iter.Close() +} + +func (iter *Iter) WriteSolitaryRow(writer writers.Row) error { + if iter.err != nil { + return iter.Close() + } + var err error + if iter.meta.colCount != writer.ColumnsInRow() { + return fmt.Errorf("WriteRows error. columns count in response %d, %T designed for %d columns", iter.meta.colCount, writer, writer.ColumnsInRow()) + } + err = iter.framer.writeSolitaryRow(writer) + if err != nil { + iter.err = err + } + if iter.framer.notEmpty() { + iter.err = io.ErrUnexpectedEOF + } + return iter.Close() +} + // Host returns the host which the query was sent to. func (iter *Iter) Host() *HostInfo { return iter.host