Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgwire: pre-allocate a temporary buffer to use when serializing arrays/tuples #66941

Merged
merged 1 commit into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/cmd/generate-binary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,5 +544,7 @@ var inputs = map[string][]string{
`1::char(1) COLLATE "en_US"`,
`1::varchar(4) COLLATE "en_US"`,
`1::text COLLATE "en_US"`,
`1::int8,(2::int8,3::int8)`,
`1::int8,('hi'::TEXT,3::int2)`,
},
}
22 changes: 20 additions & 2 deletions pkg/sql/pgwire/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,19 @@ func TestEncodings(t *testing.T) {
// Skip floats because postgres rounds them different than Go.
continue
case *tree.DTuple:
// Unsupported.
continue

hasCollatedString := false
for _, elem := range tc.Datum.ResolvedType().TupleContents() {
if elem.Family() == types.CollatedStringFamily {
hasCollatedString = true
}
}

if hasCollatedString {
// Unsupported.
continue

}
case *tree.DCollatedString:
// Decoding collated strings is unsupported by this test. The encoded
// value is the same as a normal string, so decoding it turns it into
Expand All @@ -250,6 +261,13 @@ func TestEncodings(t *testing.T) {
pgwirebase.FormatText: tc.TextAsBinary,
pgwirebase.FormatBinary: tc.Binary,
} {

if _, ok := tc.Datum.(*tree.DTuple); ok && code == pgwirebase.FormatText {
// Decoding a Tuple from text as binary is not possible as the
// column types cannot be determined.
continue
}

d, err := pgwirebase.DecodeDatum(
&evalCtx,
types.OidToType[tc.Oid],
Expand Down
56 changes: 56 additions & 0 deletions pkg/sql/pgwire/pgwirebase/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,8 @@ func DecodeDatum(
}
case FormatBinary:
switch id {
case oid.T_record:
return decodeBinaryTuple(evalCtx, t, b)
case oid.T_bool:
if len(b) > 0 {
switch b[0] {
Expand Down Expand Up @@ -950,6 +952,60 @@ func decodeBinaryArray(
return arr, nil
}

func decodeBinaryTuple(evalCtx *tree.EvalContext, t *types.T, b []byte) (tree.Datum, error) {
if len(b) < 4 {
return nil, pgerror.Newf(pgcode.Syntax, "tuple requires a 4 byte header for binary format")
}

totalLength := int32(len(b))
numberOfElements := int32(binary.BigEndian.Uint32(b[0:4]))
typs := make([]*types.T, numberOfElements)
datums := make(tree.Datums, numberOfElements)
curByte := int32(4)
curIdx := int32(0)

for curIdx < numberOfElements {

if totalLength < curByte+4 {
return nil, pgerror.Newf(pgcode.Syntax, "tuple requires 4 bytes for each element OID for binary format")
}

elementOID := int32(binary.BigEndian.Uint32(b[curByte : curByte+4]))
elementType := types.OidToType[oid.Oid(elementOID)]
typs[curIdx] = elementType
curByte = curByte + 4

if totalLength < curByte+4 {
return nil, pgerror.Newf(pgcode.Syntax, "tuple requires 4 bytes for the size of each element for binary format")
}

elementLength := int32(binary.BigEndian.Uint32(b[curByte : curByte+4]))
curByte = curByte + 4

if elementLength == -1 {
datums[curIdx] = tree.DNull
} else {
if totalLength < curByte+elementLength {
return nil, pgerror.Newf(pgcode.Syntax, "tuple requires %d bytes for element %d for binary format", elementLength, curIdx)
}

colDatum, err := DecodeDatum(evalCtx, elementType, FormatBinary, b[curByte:curByte+elementLength])

if err != nil {
return nil, err
}

curByte = curByte + elementLength
datums[curIdx] = colDatum
}
curIdx++
}

tupleTyps := types.MakeTuple(typs)
return tree.NewDTuple(tupleTyps, datums...), nil

}

var invalidUTF8Error = pgerror.Newf(pgcode.CharacterNotInRepertoire, "invalid UTF-8 sequence")

var (
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/pgwire/testdata/encodings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,20 @@
"TextAsBinary": [40, 49, 44, 41],
"Binary": [0, 0, 0, 2, 0, 0, 0, 25, 0, 0, 0, 1, 49, 0, 0, 2, 193, 255, 255, 255, 255]
},
{
"SQL": "(1::int8,(2::int8,3::int8),null)",
"Oid": 2249,
"Text": "(1,\"(2,3)\",)",
"TextAsBinary": [40, 49, 44, 34, 40, 50, 44, 51, 41, 34, 44, 41],
"Binary": [0, 0, 0, 3, 0, 0, 0, 20, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 8, 201, 0, 0, 0, 36, 0, 0, 0, 2, 0, 0, 0, 20, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 20, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 2, 193, 255, 255, 255, 255]
},
{
"SQL": "(1::int8,('hi'::TEXT,3::int2),null)",
"Oid": 2249,
"Text": "(1,\"(hi,3)\",)",
"TextAsBinary": [40, 49, 44, 34, 40, 104, 105, 44, 51, 41, 34, 44, 41],
"Binary": [0, 0, 0, 3, 0, 0, 0, 20, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 8, 201, 0, 0, 0, 24, 0, 0, 0, 2, 0, 0, 0, 25, 0, 0, 0, 2, 104, 105, 0, 0, 0, 21, 0, 0, 0, 2, 0, 3, 0, 0, 2, 193, 255, 255, 255, 255]
},
{
"SQL": "B''",
"Oid": 1560,
Expand Down
41 changes: 26 additions & 15 deletions pkg/sql/pgwire/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,17 +648,22 @@ func writeBinaryDatumNotNull(
writeBinaryInterval(b, v.Duration)

case *tree.DTuple:
// TODO(andrei): We shouldn't be allocating a new buffer for every array.
subWriter := newWriteBuffer(nil /* bytecount */)
initialLen := b.Len()

// Reserve bytes for writing length later.
b.putInt32(int32(0))

// Put the number of datums.
subWriter.putInt32(int32(len(v.D)))
b.putInt32(int32(len(v.D)))
tupleTypes := t.TupleContents()
for i, elem := range v.D {
oid := tupleTypes[i].Oid()
subWriter.putInt32(int32(oid))
subWriter.writeBinaryDatum(ctx, elem, sessionLoc, tupleTypes[i])
b.putInt32(int32(oid))
b.writeBinaryDatum(ctx, elem, sessionLoc, tupleTypes[i])
}
b.writeLengthPrefixedBuffer(&subWriter.wrapped)

lengthToWrite := b.Len() - (initialLen + 4)
b.putInt32AtIndex(initialLen /* index to write at */, int32(lengthToWrite))

case *tree.DBox2D:
b.putInt32(32)
Expand All @@ -681,30 +686,36 @@ func writeBinaryDatumNotNull(
"binenc", "unsupported binary serialization of multidimensional arrays"))
return
}
// TODO(andrei): We shouldn't be allocating a new buffer for every array.
subWriter := newWriteBuffer(nil /* bytecount */)

initialLen := b.Len()

// Reserve bytes for writing length later.
b.putInt32(int32(0))

// Put the number of dimensions. We currently support 1d arrays only.
var ndims int32 = 1
if v.Len() == 0 {
ndims = 0
}
subWriter.putInt32(ndims)
b.putInt32(ndims)
hasNulls := 0
if v.HasNulls {
hasNulls = 1
}
oid := v.ParamTyp.Oid()
subWriter.putInt32(int32(hasNulls))
subWriter.putInt32(int32(oid))
b.putInt32(int32(hasNulls))
b.putInt32(int32(oid))
if v.Len() > 0 {
subWriter.putInt32(int32(v.Len()))
b.putInt32(int32(v.Len()))
// Lower bound, we only support a lower bound of 1.
subWriter.putInt32(1)
b.putInt32(1)
for _, elem := range v.Array {
subWriter.writeBinaryDatum(ctx, elem, sessionLoc, v.ParamTyp)
b.writeBinaryDatum(ctx, elem, sessionLoc, v.ParamTyp)
}
}
b.writeLengthPrefixedBuffer(&subWriter.wrapped)

lengthToWrite := b.Len() - (initialLen + 4)
b.putInt32AtIndex(initialLen /* index to write at */, int32(lengthToWrite))

case *tree.DJSON:
writeBinaryJSON(b, v.JSON)
Expand Down
19 changes: 8 additions & 11 deletions pkg/sql/pgwire/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func (b *writeBuffer) writeString(s string) {
}
}

func (b *writeBuffer) Len() int {
return b.wrapped.Len()
}

func (b *writeBuffer) nullTerminate() {
if b.err == nil {
b.err = b.wrapped.WriteByte(0)
Expand All @@ -97,17 +101,6 @@ func (b *writeBuffer) writeFromFmtCtx(fmtCtx *tree.FmtCtx) {
}
}

// writeLengthPrefixedBuffer writes the contents of a bytes.Buffer with a
// length prefix.
func (b *writeBuffer) writeLengthPrefixedBuffer(buf *bytes.Buffer) {
if b.err == nil {
b.putInt32(int32(buf.Len()))

// bytes.Buffer.WriteTo resets the Buffer.
_, b.err = buf.WriteTo(&b.wrapped)
}
}

// writeLengthPrefixedString writes a length-prefixed string. The
// length is encoded as an int32.
func (b *writeBuffer) writeLengthPrefixedString(s string) {
Expand Down Expand Up @@ -149,6 +142,10 @@ func (b *writeBuffer) putInt64(v int64) {
}
}

func (b *writeBuffer) putInt32AtIndex(index int, v int32) {
binary.BigEndian.PutUint32(b.wrapped.Bytes()[index:index+4], uint32(v))
}

func (b *writeBuffer) putErrFieldMsg(field pgwirebase.ServerErrFieldType) {
if b.err == nil {
b.err = b.wrapped.WriteByte(byte(field))
Expand Down