Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

allow translate log entry buffer to grow #1787

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 65 additions & 0 deletions ctl/import_test.go
Expand Up @@ -18,9 +18,11 @@ import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"reflect"
"strings"
"testing"

Expand Down Expand Up @@ -184,6 +186,69 @@ func TestImportCommand_RunKeys(t *testing.T) {
}
}

// Ensure that import with keys runs with key replication.
func TestImportCommand_KeyReplication(t *testing.T) {
buf := bytes.Buffer{}
stdin, stdout, stderr := GetIO(buf)
cm := NewImportCommand(stdin, stdout, stderr)
file, err := ioutil.TempFile("", "import-key.csv")

// create a large import file in order to test the
// translateStoreBufferSize growth logic.
keyBytes := []byte{}
for row := 0; row < 100; row++ {
for col := 0; col < 100; col++ {
x := fmt.Sprintf("foo%d,bar%d\n", row, col)
keyBytes = append(keyBytes, x...)
}
}
x := "fooEND,barEND"
keyBytes = append(keyBytes, x...)

file.Write(keyBytes)
ctx := context.Background()
if err != nil {
t.Fatal(err)
}

c := test.MustRunCluster(t, 2)
cmd0 := c[0]
cmd1 := c[1]

host0 := cmd0.API.Node().URI.HostPort()
host1 := cmd1.API.Node().URI.HostPort()

cm.Host = host0

http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+cm.Host+"/index/i", strings.NewReader(`{"options":{"keys": true}}`)))
http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+cm.Host+"/index/i/field/f", strings.NewReader(`{"options":{"keys": true}}`)))

cm.Index = "i"
cm.Field = "f"
cm.Paths = []string{file.Name()}
err = cm.Run(ctx)
if err != nil {
t.Fatalf("Import Run with key replication doesn't work: %s", err)
}

// Verify that the data is available on both nodes.
for _, host := range []string{host0, host1} {
qry := "Count(Row(f=foo0))"
resp, err := http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+host+"/index/i/query", strings.NewReader(qry)))
if err != nil {
t.Fatalf("Querying data for validation: %s", err)
}

// Read body and unmarshal response.
exp := `{"results":[100]}` + "\n"
if body, err := ioutil.ReadAll(resp.Body); err != nil {
t.Fatalf("reading: %s", err)
} else if !reflect.DeepEqual(body, []byte(exp)) {
t.Fatalf("expected: %s, but got: %s", exp, body)
}
}
}

// Ensure that integer import with keys runs.
func TestImportCommand_RunValueKeys(t *testing.T) {
buf := bytes.Buffer{}
Expand Down
22 changes: 20 additions & 2 deletions http/handler.go
Expand Up @@ -1426,7 +1426,11 @@ func (h *Handler) handlePostClusterMessage(w http.ResponseWriter, r *http.Reques
type defaultClusterMessageResponse struct{}

// translateStoreBufferSize is the buffer size used for streaming data.
const translateStoreBufferSize = 65536
const translateStoreBufferSize = 1 << 16 // 64k

// translateStoreBufferSizeMax is the maximum size that the buffer is allowed
// to grow before raising an error.
const translateStoreBufferSizeMax = 1 << 22 // 4Mb

func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
Expand All @@ -1449,16 +1453,30 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request)
}

// Copy from reader to client until store or client disconnect.
buf := make([]byte, translateStoreBufferSize)
useBufferSize := translateStoreBufferSize
buf := make([]byte, useBufferSize)
for {
// Read from store.
n, err := rdr.Read(buf)
if err == io.EOF {
return
} else if err == pilosa.ErrTranslateReadTargetUndersized {
// Increase the buffer size and try to read again.
useBufferSize *= 2
// Prevent the buffer from growing without bound.
if useBufferSize > translateStoreBufferSizeMax {
h.logger.Printf("http: translate store buffer exceeded max size: %s", err)
return
}
buf = make([]byte, useBufferSize)
continue
} else if err != nil {
h.logger.Printf("http: translate store read error: %s", err)
return
} else if n == 0 {
// Reset the default buffer size.
useBufferSize = translateStoreBufferSize
buf = make([]byte, useBufferSize)
continue
}

Expand Down
3 changes: 0 additions & 3 deletions http/handler_internal_test.go
Expand Up @@ -57,7 +57,6 @@ func TestPostIndexRequestUnmarshalJSON(t *testing.T) {
t.Errorf("expected: %v, but got: %v for JSON: %s", test.expected, *actual, test.json)
}
}

}
}

Expand Down Expand Up @@ -93,7 +92,6 @@ func TestPostFieldRequestUnmarshalJSON(t *testing.T) {
t.Errorf("test %d: expected: %v, but got: %v", i, test.expected, *actual)
}
}

}
}

Expand Down Expand Up @@ -178,6 +176,5 @@ func TestFieldOptionValidation(t *testing.T) {
t.Errorf("test %d: expected: %v, but got: %v", i, test.expected, *actual)
}
}

}
}
2 changes: 1 addition & 1 deletion test/pilosa.go
Expand Up @@ -69,7 +69,7 @@ func newCommand(opts ...server.CommandOption) *Command {
m.Config.DataDir = path
m.Config.Bind = "http://localhost:0"
m.Config.Cluster.Disabled = true
m.Config.Translation.MapSize = 100000
m.Config.Translation.MapSize = 140000

if testing.Verbose() {
m.Command.Stdout = os.Stdout
Expand Down
18 changes: 12 additions & 6 deletions translate.go
Expand Up @@ -29,10 +29,11 @@ const (
)

var (
ErrTranslateStoreClosed = errors.New("pilosa: translate store closed")
ErrTranslateStoreReaderClosed = errors.New("pilosa: translate store reader closed")
ErrReplicationNotSupported = errors.New("pilosa: replication not supported")
ErrTranslateStoreReadOnly = errors.New("pilosa: translate store could not find or create key, translate store read only")
ErrTranslateStoreClosed = errors.New("pilosa: translate store closed")
ErrTranslateStoreReaderClosed = errors.New("pilosa: translate store reader closed")
ErrReplicationNotSupported = errors.New("pilosa: replication not supported")
ErrTranslateStoreReadOnly = errors.New("pilosa: translate store could not find or create key, translate store read only")
ErrTranslateReadTargetUndersized = errors.New("pilosa: translate read target is undersized")
)

// TranslateStore is the storage for translation string-to-uint64 values.
Expand Down Expand Up @@ -1089,8 +1090,13 @@ func (r *translateFileReader) read(p []byte) (n int, err error) {
return 0, nil
}

// Shorten buffer to maximum read size.
if max := sz - r.offset; int64(len(p)) > max {
if max := sz - r.offset; max > int64(len(p)) {
// If p is not large enough to hold a single entry,
// return an error so the client can increase the
// size of p and try again.
return 0, ErrTranslateReadTargetUndersized
} else if int64(len(p)) > max {
// Shorten buffer to maximum read size.
p = p[:max]
}

Expand Down