From 95f05ca4d00431e981d9d6bfb52335021b5948a7 Mon Sep 17 00:00:00 2001 From: Travis Turner Date: Fri, 14 Dec 2018 16:47:28 -0600 Subject: [PATCH 1/2] WIP: allow translate log entry buffer to grow In the case where a translate log entry contained many key/id pairs, it was possible for the read buffer (which was allocated at 65536 bytes) to fail to handle it. This happened when the serialized LogEntry was larger than 65536 bytes. This PR adds logic which returns a custom error called ErrTranslateReadTargetUndersized notifying the reader to reallocate a larger read buffer and try the read again. TODO: - [ ] Add a max buffer size check to prevent this from doubling the buffer size with no limit. - [ ] Add tests. --- http/handler.go | 11 ++++++++++- translate.go | 18 ++++++++++++------ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/http/handler.go b/http/handler.go index c370656fa..f8188e8fd 100644 --- a/http/handler.go +++ b/http/handler.go @@ -1449,16 +1449,25 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request) } // Copy from reader to client until store or client disconnect. - buf := make([]byte, translateStoreBufferSize) + useBufferSize := translateStoreBufferSize + buf := make([]byte, useBufferSize) for { // Read from store. n, err := rdr.Read(buf) if err == io.EOF { return + } else if err == pilosa.ErrTranslateReadTargetUndersized { + // Increase the buffer size and try to read again. + useBufferSize *= 2 + buf = make([]byte, useBufferSize) + continue } else if err != nil { h.logger.Printf("http: translate store read error: %s", err) return } else if n == 0 { + // Reset the default buffer size. + useBufferSize = translateStoreBufferSize + buf = make([]byte, useBufferSize) continue } diff --git a/translate.go b/translate.go index 2c3125ad9..b86041e3d 100644 --- a/translate.go +++ b/translate.go @@ -29,10 +29,11 @@ const ( ) var ( - ErrTranslateStoreClosed = errors.New("pilosa: translate store closed") - ErrTranslateStoreReaderClosed = errors.New("pilosa: translate store reader closed") - ErrReplicationNotSupported = errors.New("pilosa: replication not supported") - ErrTranslateStoreReadOnly = errors.New("pilosa: translate store could not find or create key, translate store read only") + ErrTranslateStoreClosed = errors.New("pilosa: translate store closed") + ErrTranslateStoreReaderClosed = errors.New("pilosa: translate store reader closed") + ErrReplicationNotSupported = errors.New("pilosa: replication not supported") + ErrTranslateStoreReadOnly = errors.New("pilosa: translate store could not find or create key, translate store read only") + ErrTranslateReadTargetUndersized = errors.New("pilosa: translate read target is undersized") ) // TranslateStore is the storage for translation string-to-uint64 values. @@ -1089,8 +1090,13 @@ func (r *translateFileReader) read(p []byte) (n int, err error) { return 0, nil } - // Shorten buffer to maximum read size. - if max := sz - r.offset; int64(len(p)) > max { + if max := sz - r.offset; max > int64(len(p)) { + // If p is not large enough to hold a single entry, + // return an error so the client can increase the + // size of p and try again. + return 0, ErrTranslateReadTargetUndersized + } else if int64(len(p)) > max { + // Shorten buffer to maximum read size. p = p[:max] } From 358c32a165ab5ed90cfb1ca070da34682b07773b Mon Sep 17 00:00:00 2001 From: Travis Turner Date: Tue, 18 Dec 2018 12:48:20 -0600 Subject: [PATCH 2/2] add test for translate store buffer growth logic. add max limit to buffer size. --- ctl/import_test.go | 65 +++++++++++++++++++++++++++++++++++ http/handler.go | 11 +++++- http/handler_internal_test.go | 3 -- test/pilosa.go | 2 +- 4 files changed, 76 insertions(+), 5 deletions(-) diff --git a/ctl/import_test.go b/ctl/import_test.go index ad4fd6cf0..e701af1b2 100644 --- a/ctl/import_test.go +++ b/ctl/import_test.go @@ -18,9 +18,11 @@ import ( "bufio" "bytes" "context" + "fmt" "io" "io/ioutil" "net/http" + "reflect" "strings" "testing" @@ -184,6 +186,69 @@ func TestImportCommand_RunKeys(t *testing.T) { } } +// Ensure that import with keys runs with key replication. +func TestImportCommand_KeyReplication(t *testing.T) { + buf := bytes.Buffer{} + stdin, stdout, stderr := GetIO(buf) + cm := NewImportCommand(stdin, stdout, stderr) + file, err := ioutil.TempFile("", "import-key.csv") + + // create a large import file in order to test the + // translateStoreBufferSize growth logic. + keyBytes := []byte{} + for row := 0; row < 100; row++ { + for col := 0; col < 100; col++ { + x := fmt.Sprintf("foo%d,bar%d\n", row, col) + keyBytes = append(keyBytes, x...) + } + } + x := "fooEND,barEND" + keyBytes = append(keyBytes, x...) + + file.Write(keyBytes) + ctx := context.Background() + if err != nil { + t.Fatal(err) + } + + c := test.MustRunCluster(t, 2) + cmd0 := c[0] + cmd1 := c[1] + + host0 := cmd0.API.Node().URI.HostPort() + host1 := cmd1.API.Node().URI.HostPort() + + cm.Host = host0 + + http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+cm.Host+"/index/i", strings.NewReader(`{"options":{"keys": true}}`))) + http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+cm.Host+"/index/i/field/f", strings.NewReader(`{"options":{"keys": true}}`))) + + cm.Index = "i" + cm.Field = "f" + cm.Paths = []string{file.Name()} + err = cm.Run(ctx) + if err != nil { + t.Fatalf("Import Run with key replication doesn't work: %s", err) + } + + // Verify that the data is available on both nodes. + for _, host := range []string{host0, host1} { + qry := "Count(Row(f=foo0))" + resp, err := http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+host+"/index/i/query", strings.NewReader(qry))) + if err != nil { + t.Fatalf("Querying data for validation: %s", err) + } + + // Read body and unmarshal response. + exp := `{"results":[100]}` + "\n" + if body, err := ioutil.ReadAll(resp.Body); err != nil { + t.Fatalf("reading: %s", err) + } else if !reflect.DeepEqual(body, []byte(exp)) { + t.Fatalf("expected: %s, but got: %s", exp, body) + } + } +} + // Ensure that integer import with keys runs. func TestImportCommand_RunValueKeys(t *testing.T) { buf := bytes.Buffer{} diff --git a/http/handler.go b/http/handler.go index f8188e8fd..b85be6b0e 100644 --- a/http/handler.go +++ b/http/handler.go @@ -1426,7 +1426,11 @@ func (h *Handler) handlePostClusterMessage(w http.ResponseWriter, r *http.Reques type defaultClusterMessageResponse struct{} // translateStoreBufferSize is the buffer size used for streaming data. -const translateStoreBufferSize = 65536 +const translateStoreBufferSize = 1 << 16 // 64k + +// translateStoreBufferSizeMax is the maximum size that the buffer is allowed +// to grow before raising an error. +const translateStoreBufferSizeMax = 1 << 22 // 4Mb func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() @@ -1459,6 +1463,11 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request) } else if err == pilosa.ErrTranslateReadTargetUndersized { // Increase the buffer size and try to read again. useBufferSize *= 2 + // Prevent the buffer from growing without bound. + if useBufferSize > translateStoreBufferSizeMax { + h.logger.Printf("http: translate store buffer exceeded max size: %s", err) + return + } buf = make([]byte, useBufferSize) continue } else if err != nil { diff --git a/http/handler_internal_test.go b/http/handler_internal_test.go index 7ca9c0401..ecbf8ff87 100644 --- a/http/handler_internal_test.go +++ b/http/handler_internal_test.go @@ -57,7 +57,6 @@ func TestPostIndexRequestUnmarshalJSON(t *testing.T) { t.Errorf("expected: %v, but got: %v for JSON: %s", test.expected, *actual, test.json) } } - } } @@ -93,7 +92,6 @@ func TestPostFieldRequestUnmarshalJSON(t *testing.T) { t.Errorf("test %d: expected: %v, but got: %v", i, test.expected, *actual) } } - } } @@ -178,6 +176,5 @@ func TestFieldOptionValidation(t *testing.T) { t.Errorf("test %d: expected: %v, but got: %v", i, test.expected, *actual) } } - } } diff --git a/test/pilosa.go b/test/pilosa.go index 3bc43d593..e3858f5e0 100644 --- a/test/pilosa.go +++ b/test/pilosa.go @@ -69,7 +69,7 @@ func newCommand(opts ...server.CommandOption) *Command { m.Config.DataDir = path m.Config.Bind = "http://localhost:0" m.Config.Cluster.Disabled = true - m.Config.Translation.MapSize = 100000 + m.Config.Translation.MapSize = 140000 if testing.Verbose() { m.Command.Stdout = os.Stdout