Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
add test for translate store buffer growth logic. add max limit to bu…
Browse files Browse the repository at this point in the history
…ffer size.
  • Loading branch information
travisturner committed Dec 18, 2018
1 parent 95f05ca commit 358c32a
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 5 deletions.
65 changes: 65 additions & 0 deletions ctl/import_test.go
Expand Up @@ -18,9 +18,11 @@ import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"reflect"
"strings"
"testing"

Expand Down Expand Up @@ -184,6 +186,69 @@ func TestImportCommand_RunKeys(t *testing.T) {
}
}

// Ensure that import with keys runs with key replication.
func TestImportCommand_KeyReplication(t *testing.T) {
buf := bytes.Buffer{}
stdin, stdout, stderr := GetIO(buf)
cm := NewImportCommand(stdin, stdout, stderr)
file, err := ioutil.TempFile("", "import-key.csv")

// create a large import file in order to test the
// translateStoreBufferSize growth logic.
keyBytes := []byte{}
for row := 0; row < 100; row++ {
for col := 0; col < 100; col++ {
x := fmt.Sprintf("foo%d,bar%d\n", row, col)
keyBytes = append(keyBytes, x...)
}
}
x := "fooEND,barEND"
keyBytes = append(keyBytes, x...)

file.Write(keyBytes)
ctx := context.Background()
if err != nil {
t.Fatal(err)
}

c := test.MustRunCluster(t, 2)
cmd0 := c[0]
cmd1 := c[1]

host0 := cmd0.API.Node().URI.HostPort()
host1 := cmd1.API.Node().URI.HostPort()

cm.Host = host0

http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+cm.Host+"/index/i", strings.NewReader(`{"options":{"keys": true}}`)))
http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+cm.Host+"/index/i/field/f", strings.NewReader(`{"options":{"keys": true}}`)))

cm.Index = "i"
cm.Field = "f"
cm.Paths = []string{file.Name()}
err = cm.Run(ctx)
if err != nil {
t.Fatalf("Import Run with key replication doesn't work: %s", err)
}

// Verify that the data is available on both nodes.
for _, host := range []string{host0, host1} {
qry := "Count(Row(f=foo0))"
resp, err := http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+host+"/index/i/query", strings.NewReader(qry)))
if err != nil {
t.Fatalf("Querying data for validation: %s", err)
}

// Read body and unmarshal response.
exp := `{"results":[100]}` + "\n"
if body, err := ioutil.ReadAll(resp.Body); err != nil {
t.Fatalf("reading: %s", err)
} else if !reflect.DeepEqual(body, []byte(exp)) {
t.Fatalf("expected: %s, but got: %s", exp, body)
}
}
}

// Ensure that integer import with keys runs.
func TestImportCommand_RunValueKeys(t *testing.T) {
buf := bytes.Buffer{}
Expand Down
11 changes: 10 additions & 1 deletion http/handler.go
Expand Up @@ -1426,7 +1426,11 @@ func (h *Handler) handlePostClusterMessage(w http.ResponseWriter, r *http.Reques
type defaultClusterMessageResponse struct{}

// translateStoreBufferSize is the buffer size used for streaming data.
const translateStoreBufferSize = 65536
const translateStoreBufferSize = 1 << 16 // 64k

// translateStoreBufferSizeMax is the maximum size that the buffer is allowed
// to grow before raising an error.
const translateStoreBufferSizeMax = 1 << 22 // 4Mb

func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
Expand Down Expand Up @@ -1459,6 +1463,11 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request)
} else if err == pilosa.ErrTranslateReadTargetUndersized {
// Increase the buffer size and try to read again.
useBufferSize *= 2
// Prevent the buffer from growing without bound.
if useBufferSize > translateStoreBufferSizeMax {
h.logger.Printf("http: translate store buffer exceeded max size: %s", err)
return
}
buf = make([]byte, useBufferSize)
continue
} else if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions http/handler_internal_test.go
Expand Up @@ -57,7 +57,6 @@ func TestPostIndexRequestUnmarshalJSON(t *testing.T) {
t.Errorf("expected: %v, but got: %v for JSON: %s", test.expected, *actual, test.json)
}
}

}
}

Expand Down Expand Up @@ -93,7 +92,6 @@ func TestPostFieldRequestUnmarshalJSON(t *testing.T) {
t.Errorf("test %d: expected: %v, but got: %v", i, test.expected, *actual)
}
}

}
}

Expand Down Expand Up @@ -178,6 +176,5 @@ func TestFieldOptionValidation(t *testing.T) {
t.Errorf("test %d: expected: %v, but got: %v", i, test.expected, *actual)
}
}

}
}
2 changes: 1 addition & 1 deletion test/pilosa.go
Expand Up @@ -69,7 +69,7 @@ func newCommand(opts ...server.CommandOption) *Command {
m.Config.DataDir = path
m.Config.Bind = "http://localhost:0"
m.Config.Cluster.Disabled = true
m.Config.Translation.MapSize = 100000
m.Config.Translation.MapSize = 140000

if testing.Verbose() {
m.Command.Stdout = os.Stdout
Expand Down

0 comments on commit 358c32a

Please sign in to comment.