Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENG-12854: Unreasonable memory usage of the Go driver in sync mode #41

Merged
merged 7 commits into from Jul 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Expand Up @@ -2,6 +2,10 @@ test:
go test -v ./wire
go test -v ./voltdbclient

test-race:
go test -v -race ./wire
go test -v -race ./voltdbclient

bench:
go test -run none -v -bench=. ./voltdbclient

Expand Down
3 changes: 3 additions & 0 deletions examples/voter/benchmark_runner.go
Expand Up @@ -25,6 +25,7 @@ import (
"log"
"os"
"reflect"
"runtime"
"runtime/pprof"
"sync/atomic"
"time"
Expand Down Expand Up @@ -349,6 +350,7 @@ func takeMemProfile() {
if err != nil {
log.Fatal(err)
}
runtime.GC()
pprof.WriteHeapProfile(f)
f.Close()
return
Expand Down Expand Up @@ -443,6 +445,7 @@ func main() {

setupProfiler()
defer teardownProfiler()
defer takeMemProfile()

bm, _ = newBenchmark()
bm.runBenchmark()
Expand Down
52 changes: 0 additions & 52 deletions voltdbclient/network_writer.go
Expand Up @@ -18,61 +18,9 @@
package voltdbclient

import (
"io"
"sync"

"github.com/VoltDB/voltdb-client-go/wire"
)

type networkWriter struct {
bp bool
bpMutex sync.RWMutex
}

func newNetworkWriter() *networkWriter {
var nw = new(networkWriter)
nw.bp = false
return nw
}

func (nw *networkWriter) writePIs(writer io.Writer, piCh <-chan *procedureInvocation, wg *sync.WaitGroup) {
e := wire.NewEncoder()
for pi := range piCh {
e.Reset()
EncodePI(e, pi)
writer.Write(e.Bytes())
}
wire.PutEncoder(e)
wg.Done()
}

func (nw *networkWriter) connect(writer io.Writer, piCh <-chan *procedureInvocation, wg *sync.WaitGroup) {
go nw.writePIs(writer, piCh, wg)
}

func (nw *networkWriter) disconnect(piCh chan *procedureInvocation) {
close(piCh)
}

func (nw *networkWriter) hasBP() bool {
nw.bpMutex.RLock()
bp := nw.bp
nw.bpMutex.RUnlock()
return bp
}

func (nw *networkWriter) setBP() {
nw.bpMutex.Lock()
nw.bp = true
nw.bpMutex.Unlock()
}

func (nw *networkWriter) unsetBP() {
nw.bpMutex.Lock()
nw.bp = false
nw.bpMutex.Unlock()
}

func EncodePI(e *wire.Encoder, pi *procedureInvocation) error {
_, err := e.Int32(int32(pi.getLen()))
if err != nil {
Expand Down
58 changes: 34 additions & 24 deletions voltdbclient/node_conn.go
Expand Up @@ -44,7 +44,9 @@ type nodeConn struct {
closeCh chan chan bool

// channel for pi's meant specifically for this connection.
ncPiCh chan *procedureInvocation
ncPiCh chan *procedureInvocation
decoder *wire.Decoder
encoder *wire.Encoder
}

func newNodeConn(ci string, ncPiCh chan *procedureInvocation) *nodeConn {
Expand All @@ -54,6 +56,8 @@ func newNodeConn(ci string, ncPiCh chan *procedureInvocation) *nodeConn {
nc.bpCh = make(chan chan bool)
nc.closeCh = make(chan chan bool)
nc.drainCh = make(chan chan bool)
nc.decoder = wire.NewDecoder(nil)
nc.encoder = wire.NewEncoder()
return nc
}

Expand Down Expand Up @@ -108,9 +112,10 @@ func (nc *nodeConn) reconnect(protocolVersion int, piCh <-chan *procedureInvocat
}

func (nc *nodeConn) networkConnect(protocolVersion int) (*net.TCPConn, *wire.ConnInfo, error) {
e := wire.NewEncoder()
d := &wire.Decoder{}
defer wire.PutEncoder(e)
defer func() {
nc.decoder.Reset()
nc.encoder.Reset()
}()
u, err := parseURL(nc.connInfo)
if err != nil {
return nil, nil, err
Expand All @@ -124,7 +129,8 @@ func (nc *nodeConn) networkConnect(protocolVersion int) (*net.TCPConn, *wire.Con
return nil, nil, fmt.Errorf("failed to connect to server %v", nc.connInfo)
}
pass, _ := u.User.Password()
login, err := e.Login(protocolVersion, u.User.Username(), pass)
nc.encoder.Reset()
login, err := nc.encoder.Login(protocolVersion, u.User.Username(), pass)
if err != nil {
tcpConn.Close()
return nil, nil, fmt.Errorf("failed to serialize login message %v", nc.connInfo)
Expand All @@ -133,8 +139,9 @@ func (nc *nodeConn) networkConnect(protocolVersion int) (*net.TCPConn, *wire.Con
if err != nil {
return nil, nil, err
}
d.SetReader(tcpConn)
i, err := d.Login()
nc.decoder.Reset()
nc.decoder.SetReader(tcpConn)
i, err := nc.decoder.Login()
if err != nil {
tcpConn.Close()
return nil, nil, fmt.Errorf("failed to login to server %v", nc.connInfo)
Expand All @@ -155,6 +162,7 @@ func (nc *nodeConn) hasBP() bool {
// listen listens for messages from the server and calls back a registered listener.
// listen blocks on input from the server and should be run as a go routine.
func (nc *nodeConn) listen(reader io.Reader, responseCh chan<- *bytes.Buffer) {

d := wire.NewDecoder(reader)
s := &wire.Decoder{}
for {
Expand Down Expand Up @@ -182,6 +190,7 @@ func (nc *nodeConn) listen(reader io.Reader, responseCh chan<- *bytes.Buffer) {
}

func (nc *nodeConn) loop(writer io.Writer, piCh <-chan *procedureInvocation, responseCh <-chan *bytes.Buffer, bpCh <-chan chan bool, drainCh chan chan bool) {

// declare mutable state
requests := make(map[int64]*networkRequest)
ncPiCh := nc.ncPiCh
Expand All @@ -197,7 +206,6 @@ func (nc *nodeConn) loop(writer io.Writer, piCh <-chan *procedureInvocation, res
var pingTimeout = 2 * time.Minute
pingSentTime := time.Now()
var pingOutstanding bool
d := &wire.Decoder{}
for {
// setup select cases
if draining {
Expand Down Expand Up @@ -238,9 +246,9 @@ func (nc *nodeConn) loop(writer io.Writer, piCh <-chan *procedureInvocation, res
case pi := <-piCh:
nc.handleProcedureInvocation(writer, pi, &requests, &queuedBytes)
case resp := <-responseCh:
d.SetReader(resp)
handle, err := d.Int64()
d.Reset()
nc.decoder.SetReader(resp)
handle, err := nc.decoder.Int64()
nc.decoder.Reset()
// can't do anything without a handle. If reading the handle fails,
// then log and drop the message.
if err != nil {
Expand All @@ -257,13 +265,14 @@ func (nc *nodeConn) loop(writer io.Writer, piCh <-chan *procedureInvocation, res
continue
}
queuedBytes -= req.numBytes

delete(requests, handle)
if req.isSync() {

nc.handleSyncResponse(handle, resp, req)
} else {
nc.handleAsyncResponse(handle, resp, req)
}

case respBPCh := <-bpCh:
respBPCh <- bp
case drainRespCh = <-drainCh:
Expand Down Expand Up @@ -291,28 +300,29 @@ func (nc *nodeConn) handleProcedureInvocation(writer io.Writer, pi *procedureInv
}
(*requests)[pi.handle] = nr
*queuedBytes += pi.slen
e := wire.NewEncoder()
EncodePI(e, pi)
writer.Write(e.Bytes())
wire.PutEncoder(e)
nc.encoder.Reset()
EncodePI(nc.encoder, pi)
writer.Write(nc.encoder.Bytes())
nc.encoder.Reset()
}

func (nc *nodeConn) handleSyncResponse(handle int64, r io.Reader, req *networkRequest) {
respCh := req.getChan()
d := wire.NewDecoder(r)
rsp, err := decodeResponse(d, handle)
nc.decoder.SetReader(r)
defer nc.decoder.Reset()
rsp, err := decodeResponse(nc.decoder, handle)
if err != nil {
respCh <- err.(voltResponse)
} else if req.isQuery() {

if rows, err := decodeRows(d, rsp); err != nil {
if rows, err := decodeRows(nc.decoder, rsp); err != nil {
respCh <- err.(voltResponse)
} else {
respCh <- rows
}
} else {

if result, err := decodeResult(d, rsp); err != nil {
if result, err := decodeResult(nc.decoder, rsp); err != nil {
respCh <- err.(voltResponse)
} else {
respCh <- result
Expand Down Expand Up @@ -349,10 +359,10 @@ func (nc *nodeConn) handleAsyncTimeout(req *networkRequest) {

func (nc *nodeConn) sendPing(writer io.Writer) {
pi := newProcedureInvocationByHandle(PingHandle, true, "@Ping", []driver.Value{})
e := wire.NewEncoder()
EncodePI(e, pi)
writer.Write(e.Bytes())
wire.PutEncoder(e)
nc.encoder.Reset()
EncodePI(nc.encoder, pi)
writer.Write(nc.encoder.Bytes())
nc.encoder.Reset()
}

// AsyncResponseConsumer is a type that consumes responses from asynchronous
Expand Down
39 changes: 21 additions & 18 deletions wire/decoder.go
Expand Up @@ -76,11 +76,14 @@ func (d *Decoder) Int32() (int32, error) {
// is for voltdb wire protocol encoded bytes stream. Then the bytes read are
// decoded as uint32 using big endianess
func (d *Decoder) Uint32() (uint32, error) {
b, err := d.read(IntegerSize)
var a [IntegerSize]byte
b := a[:]
_, err := d.r.Read(b)
if err != nil {
return 0, err
}
return endian.Uint32(b), nil
v := endian.Uint32(b)
return v, nil
}

// Int64 reads and decodes voltdb wire protocol encoded []byte to int64.
Expand All @@ -98,11 +101,14 @@ func (d *Decoder) Int64() (int64, error) {
// is for voltdb wire protocol encoded bytes stream. Then the bytes read are
// decoded as uint64 using big endianess
func (d *Decoder) Uint64() (uint64, error) {
b, err := d.read(LongSize)
var a [LongSize]byte
b := a[:]
_, err := d.r.Read(b)
if err != nil {
return 0, err
}
return endian.Uint64(b), nil
v := endian.Uint64(b)
return v, nil
}

// Time reads and decodes voltdb wire protocol encoded []byte to time.Time.
Expand Down Expand Up @@ -150,11 +156,14 @@ func (d *Decoder) String() (string, error) {
// is for voltdb wire protocol encoded bytes stream. Then the bytes read are
// decoded as uint16 using big endianess
func (d *Decoder) Uint16() (uint16, error) {
b, err := d.read(ShortSize)
var a [ShortSize]byte
b := a[:]
_, err := d.r.Read(b)
if err != nil {
return 0, err
}
return endian.Uint16(b), nil
v := endian.Uint16(b)
return v, nil
}

// Int16 reads and decodes voltdb wire protocol encoded []byte to int16.
Expand Down Expand Up @@ -183,15 +192,6 @@ func (d *Decoder) StringSlice() ([]string, error) {
return a, nil
}

func (d *Decoder) read(size int) ([]byte, error) {
b := make([]byte, size)
_, err := d.r.Read(b)
if err != nil {
return nil, err
}
return b, nil
}

// Read implements io.Reader
func (d *Decoder) Read(b []byte) (int, error) {
return d.r.Read(b)
Expand Down Expand Up @@ -225,11 +225,14 @@ func (d *Decoder) MessageHeader() (int32, error) {

// Byte reads and decodes voltdb wire protocol encoded []byte to int8.
func (d *Decoder) Byte() (int8, error) {
b, err := d.read(ByteSize)
var a [ByteSize]byte
b := a[:]
_, err := d.r.Read(b)
if err != nil {
return 0, err
}
return int8(b[0]), nil
v := a[0]
return int8(v), nil
}

// Login decodes response message received after successful logging to a voltdb
Expand Down Expand Up @@ -416,7 +419,7 @@ func (d *DecoderAt) StringAt(offset int64) (string, error) {
// Uint16At decodes voltdb wire protocol encoded []byte read from the given
// offset to uint16.
func (d *DecoderAt) Uint16At(offset int64) (uint16, error) {
b, err := d.readAt(shortSize, offset)
b, err := d.readAt(ShortSize, offset)
if err != nil {
return 0, err
}
Expand Down