Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Add context to Client, Executor, & Handler. #139

Merged
merged 1 commit into from Nov 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
163 changes: 117 additions & 46 deletions client.go

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions client_test.go
Expand Up @@ -2,6 +2,7 @@ package pilosa_test

import (
"bytes"
"context"
"reflect"
"testing"

Expand All @@ -27,7 +28,7 @@ func TestClient_Import(t *testing.T) {

// Send import request.
c := MustNewClient(s.Host())
if err := c.Import("d", "f", 0, []pilosa.Bit{
if err := c.Import(context.Background(), "d", "f", 0, []pilosa.Bit{
{BitmapID: 0, ProfileID: 1},
{BitmapID: 0, ProfileID: 5},
{BitmapID: 200, ProfileID: 6},
Expand Down Expand Up @@ -65,12 +66,12 @@ func TestClient_BackupRestore(t *testing.T) {

// Backup from frame.
var buf bytes.Buffer
if err := c.BackupTo(&buf, "d", "f"); err != nil {
if err := c.BackupTo(context.Background(), &buf, "d", "f"); err != nil {
t.Fatal(err)
}

// Restore to a different frame.
if err := c.RestoreFrom(&buf, "x", "y"); err != nil {
if err := c.RestoreFrom(context.Background(), &buf, "x", "y"); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -110,7 +111,7 @@ func TestClient_FragmentBlocks(t *testing.T) {

// Retrieve blocks.
c := MustNewClient(s.Host())
blocks, err := c.FragmentBlocks("d", "f", 0)
blocks, err := c.FragmentBlocks(context.Background(), "d", "f", 0)
if err != nil {
t.Fatal(err)
} else if len(blocks) != 2 {
Expand Down
109 changes: 55 additions & 54 deletions executor.go

Large diffs are not rendered by default.

47 changes: 24 additions & 23 deletions executor_test.go
@@ -1,6 +1,7 @@
package pilosa_test

import (
"context"
"reflect"
"strings"
"testing"
Expand All @@ -22,7 +23,7 @@ func TestExecutor_Execute_Bitmap(t *testing.T) {
}

e := NewExecutor(idx.Index, NewCluster(1))
if res, err := e.Execute("d", MustParse(`Bitmap(id=10, frame=f)`), nil, nil); err != nil {
if res, err := e.Execute(context.Background(), "d", MustParse(`Bitmap(id=10, frame=f)`), nil, nil); err != nil {
t.Fatal(err)
} else if bits := res[0].(*pilosa.Bitmap).Bits(); !reflect.DeepEqual(bits, []uint64{3, SliceWidth + 1}) {
t.Fatalf("unexpected bits: %+v", bits)
Expand All @@ -42,7 +43,7 @@ func TestExecutor_Execute_Difference(t *testing.T) {
idx.MustCreateFragmentIfNotExists("d", "general", 0).MustSetBits(11, 4)

e := NewExecutor(idx.Index, NewCluster(1))
if res, err := e.Execute("d", MustParse(`Difference(Bitmap(id=10), Bitmap(id=11))`), nil, nil); err != nil {
if res, err := e.Execute(context.Background(), "d", MustParse(`Difference(Bitmap(id=10), Bitmap(id=11))`), nil, nil); err != nil {
t.Fatal(err)
} else if bits := res[0].(*pilosa.Bitmap).Bits(); !reflect.DeepEqual(bits, []uint64{1, 3}) {
t.Fatalf("unexpected bits: %+v", bits)
Expand All @@ -62,7 +63,7 @@ func TestExecutor_Execute_Intersect(t *testing.T) {
idx.MustCreateFragmentIfNotExists("d", "general", 1).MustSetBits(11, SliceWidth+2)

e := NewExecutor(idx.Index, NewCluster(1))
if res, err := e.Execute("d", MustParse(`Intersect(Bitmap(id=10), Bitmap(id=11))`), nil, nil); err != nil {
if res, err := e.Execute(context.Background(), "d", MustParse(`Intersect(Bitmap(id=10), Bitmap(id=11))`), nil, nil); err != nil {
t.Fatal(err)
} else if bits := res[0].(*pilosa.Bitmap).Bits(); !reflect.DeepEqual(bits, []uint64{1, SliceWidth + 2}) {
t.Fatalf("unexpected bits: %+v", bits)
Expand All @@ -81,7 +82,7 @@ func TestExecutor_Execute_Union(t *testing.T) {
idx.MustCreateFragmentIfNotExists("d", "general", 1).MustSetBits(11, SliceWidth+2)

e := NewExecutor(idx.Index, NewCluster(1))
if res, err := e.Execute("d", MustParse(`Union(Bitmap(id=10), Bitmap(id=11))`), nil, nil); err != nil {
if res, err := e.Execute(context.Background(), "d", MustParse(`Union(Bitmap(id=10), Bitmap(id=11))`), nil, nil); err != nil {
t.Fatal(err)
} else if bits := res[0].(*pilosa.Bitmap).Bits(); !reflect.DeepEqual(bits, []uint64{0, 2, SliceWidth + 1, SliceWidth + 2}) {
t.Fatalf("unexpected bits: %+v", bits)
Expand All @@ -97,7 +98,7 @@ func TestExecutor_Execute_Count(t *testing.T) {
idx.MustCreateFragmentIfNotExists("d", "f", 1).MustSetBits(10, SliceWidth+2)

e := NewExecutor(idx.Index, NewCluster(1))
if res, err := e.Execute("d", MustParse(`Count(Bitmap(id=10, frame=f))`), nil, nil); err != nil {
if res, err := e.Execute(context.Background(), "d", MustParse(`Count(Bitmap(id=10, frame=f))`), nil, nil); err != nil {
t.Fatal(err)
} else if res[0] != uint64(3) {
t.Fatalf("unexpected n: %d", res[0])
Expand All @@ -115,7 +116,7 @@ func TestExecutor_Execute_SetBit(t *testing.T) {
t.Fatalf("unexpected bitmap count: %d", n)
}

if res, err := e.Execute("d", MustParse(`SetBit(id=11, frame=f, profileID=1)`), nil, nil); err != nil {
if res, err := e.Execute(context.Background(), "d", MustParse(`SetBit(id=11, frame=f, profileID=1)`), nil, nil); err != nil {
t.Fatal(err)
} else {
if !res[0].(bool) {
Expand All @@ -126,7 +127,7 @@ func TestExecutor_Execute_SetBit(t *testing.T) {
if n := f.Bitmap(11).Count(); n != 1 {
t.Fatalf("unexpected bitmap count: %d", n)
}
if res, err := e.Execute("d", MustParse(`SetBit(id=11, frame=f, profileID=1)`), nil, nil); err != nil {
if res, err := e.Execute(context.Background(), "d", MustParse(`SetBit(id=11, frame=f, profileID=1)`), nil, nil); err != nil {
t.Fatal(err)
} else {
if res[0].(bool) {
Expand All @@ -143,16 +144,16 @@ func TestExecutor_Execute_SetBitmapAttrs(t *testing.T) {
// Set two fields on f/10.
// Also set fields on other bitmaps and frames to test isolation.
e := NewExecutor(idx.Index, NewCluster(1))
if _, err := e.Execute("d", MustParse(`SetBitmapAttrs(id=10, frame=f, foo="bar")`), nil, nil); err != nil {
if _, err := e.Execute(context.Background(), "d", MustParse(`SetBitmapAttrs(id=10, frame=f, foo="bar")`), nil, nil); err != nil {
t.Fatal(err)
}
if _, err := e.Execute("d", MustParse(`SetBitmapAttrs(id=200, frame=f, YYY=1)`), nil, nil); err != nil {
if _, err := e.Execute(context.Background(), "d", MustParse(`SetBitmapAttrs(id=200, frame=f, YYY=1)`), nil, nil); err != nil {
t.Fatal(err)
}
if _, err := e.Execute("d", MustParse(`SetBitmapAttrs(id=10, frame=XXX, YYY=1)`), nil, nil); err != nil {
if _, err := e.Execute(context.Background(), "d", MustParse(`SetBitmapAttrs(id=10, frame=XXX, YYY=1)`), nil, nil); err != nil {
t.Fatal(err)
}
if _, err := e.Execute("d", MustParse(`SetBitmapAttrs(id=10, frame=f, baz=123, bat=true)`), nil, nil); err != nil {
if _, err := e.Execute(context.Background(), "d", MustParse(`SetBitmapAttrs(id=10, frame=f, baz=123, bat=true)`), nil, nil); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -182,7 +183,7 @@ func TestExecutor_Execute_TopN(t *testing.T) {

// Execute query.
e := NewExecutor(idx.Index, NewCluster(1))
if result, err := e.Execute("d", MustParse(`TopN(frame=f, n=2)`), nil, nil); err != nil {
if result, err := e.Execute(context.Background(), "d", MustParse(`TopN(frame=f, n=2)`), nil, nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(result[0], []pilosa.Pair{
{Key: 0, Count: 5},
Expand All @@ -205,7 +206,7 @@ func TestExecutor_Execute_TopN_fill(t *testing.T) {

// Execute query.
e := NewExecutor(idx.Index, NewCluster(1))
if result, err := e.Execute("d", MustParse(`TopN(frame=f, n=1)`), nil, nil); err != nil {
if result, err := e.Execute(context.Background(), "d", MustParse(`TopN(frame=f, n=1)`), nil, nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(result, []interface{}{[]pilosa.Pair{
{Key: 0, Count: 4},
Expand Down Expand Up @@ -236,7 +237,7 @@ func TestExecutor_Execute_TopN_Src(t *testing.T) {

// Execute query.
e := NewExecutor(idx.Index, NewCluster(1))
if result, err := e.Execute("d", MustParse(`TopN(Bitmap(id=100, frame=other), frame=f, n=3)`), nil, nil); err != nil {
if result, err := e.Execute(context.Background(), "d", MustParse(`TopN(Bitmap(id=100, frame=other), frame=f, n=3)`), nil, nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(result, []interface{}{[]pilosa.Pair{
{Key: 20, Count: 3},
Expand All @@ -258,7 +259,7 @@ func TestExecutor_Execute_Range(t *testing.T) {
}

e := NewExecutor(idx.Index, NewCluster(1))
if res, err := e.Execute("d", MustParse(`Range(id=1, frame=f.t, start="2000-01-01T00:00", end="2000-01-01T01:00")`), nil, nil); err != nil {
if res, err := e.Execute(context.Background(), "d", MustParse(`Range(id=1, frame=f.t, start="2000-01-01T00:00", end="2000-01-01T01:00")`), nil, nil); err != nil {
t.Fatal(err)
} else if bits := res[0].(*pilosa.Bitmap).Bits(); !reflect.DeepEqual(bits, []uint64{100}) {
t.Fatalf("unexpected bits: %+v", bits)
Expand All @@ -275,7 +276,7 @@ func TestExecutor_Execute_Remote_Bitmap(t *testing.T) {
c.Nodes[1].Host = s.Host()

// Mock secondary server's executor to verify arguments and return a bitmap.
s.Handler.Executor.ExecuteFn = func(db string, query *pql.Query, slices []uint64, opt *pilosa.ExecOptions) ([]interface{}, error) {
s.Handler.Executor.ExecuteFn = func(ctx context.Context, db string, query *pql.Query, slices []uint64, opt *pilosa.ExecOptions) ([]interface{}, error) {
if db != `d` {
t.Fatalf("unexpected db: %s", db)
} else if query.String() != `Bitmap(id=10, frame=f)` {
Expand All @@ -300,7 +301,7 @@ func TestExecutor_Execute_Remote_Bitmap(t *testing.T) {
idx.MustCreateFragmentIfNotExists("d", "f", 1).MustSetBits(10, (1*SliceWidth)+1)

e := NewExecutor(idx.Index, c)
if res, err := e.Execute("d", MustParse(`Bitmap(id=10, frame=f)`), nil, nil); err != nil {
if res, err := e.Execute(context.Background(), "d", MustParse(`Bitmap(id=10, frame=f)`), nil, nil); err != nil {
t.Fatal(err)
} else if bits := res[0].(*pilosa.Bitmap).Bits(); !reflect.DeepEqual(bits, []uint64{1, 2, (1 * SliceWidth) + 1, 2*SliceWidth + 4}) {
t.Fatalf("unexpected bits: %+v", bits)
Expand All @@ -317,7 +318,7 @@ func TestExecutor_Execute_Remote_Count(t *testing.T) {
c.Nodes[1].Host = s.Host()

// Mock secondary server's executor to return a count.
s.Handler.Executor.ExecuteFn = func(db string, query *pql.Query, slices []uint64, opt *pilosa.ExecOptions) ([]interface{}, error) {
s.Handler.Executor.ExecuteFn = func(ctx context.Context, db string, query *pql.Query, slices []uint64, opt *pilosa.ExecOptions) ([]interface{}, error) {
return []interface{}{uint64(10)}, nil
}

Expand All @@ -328,7 +329,7 @@ func TestExecutor_Execute_Remote_Count(t *testing.T) {
idx.MustCreateFragmentIfNotExists("d", "f", 1).MustSetBits(10, (1*SliceWidth)+2)

e := NewExecutor(idx.Index, c)
if res, err := e.Execute("d", MustParse(`Count(Bitmap(id=10, frame=f))`), nil, nil); err != nil {
if res, err := e.Execute(context.Background(), "d", MustParse(`Count(Bitmap(id=10, frame=f))`), nil, nil); err != nil {
t.Fatal(err)
} else if res[0] != uint64(12) {
t.Fatalf("unexpected n: %d", res[0])
Expand All @@ -347,7 +348,7 @@ func TestExecutor_Execute_Remote_SetBit(t *testing.T) {

// Mock secondary server's executor to verify arguments.
var remoteCalled bool
s.Handler.Executor.ExecuteFn = func(db string, query *pql.Query, slices []uint64, opt *pilosa.ExecOptions) ([]interface{}, error) {
s.Handler.Executor.ExecuteFn = func(ctx context.Context, db string, query *pql.Query, slices []uint64, opt *pilosa.ExecOptions) ([]interface{}, error) {
if db != `d` {
t.Fatalf("unexpected db: %s", db)
} else if query.String() != `SetBit(id=10, frame=f, profileID=2)` {
Expand All @@ -362,7 +363,7 @@ func TestExecutor_Execute_Remote_SetBit(t *testing.T) {
defer idx.Close()

e := NewExecutor(idx.Index, c)
if _, err := e.Execute("d", MustParse(`SetBit(id=10, frame=f, profileID=2)`), nil, nil); err != nil {
if _, err := e.Execute(context.Background(), "d", MustParse(`SetBit(id=10, frame=f, profileID=2)`), nil, nil); err != nil {
t.Fatal(err)
}

Expand All @@ -386,7 +387,7 @@ func TestExecutor_Execute_Remote_TopN(t *testing.T) {

// Mock secondary server's executor to verify arguments and return a bitmap.
var remoteExecN int
s.Handler.Executor.ExecuteFn = func(db string, query *pql.Query, slices []uint64, opt *pilosa.ExecOptions) ([]interface{}, error) {
s.Handler.Executor.ExecuteFn = func(ctx context.Context, db string, query *pql.Query, slices []uint64, opt *pilosa.ExecOptions) ([]interface{}, error) {
if db != `d` {
t.Fatalf("unexpected db: %s", db)
} else if !reflect.DeepEqual(slices, []uint64{0, 2, 4, 6}) {
Expand Down Expand Up @@ -424,7 +425,7 @@ func TestExecutor_Execute_Remote_TopN(t *testing.T) {
idx.MustCreateFragmentIfNotExists("d", "f", 3).MustSetBits(30, (3*SliceWidth)+2)

e := NewExecutor(idx.Index, c)
if res, err := e.Execute("d", MustParse(`TopN(frame=f, n=3)`), nil, nil); err != nil {
if res, err := e.Execute(context.Background(), "d", MustParse(`TopN(frame=f, n=3)`), nil, nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(res, []interface{}{[]pilosa.Pair{
{Key: 0, Count: 5},
Expand Down
7 changes: 4 additions & 3 deletions fragment.go
Expand Up @@ -4,6 +4,7 @@ import (
"archive/tar"
"bufio"
"bytes"
"context"
"crypto/sha1"
"encoding/binary"
"errors"
Expand Down Expand Up @@ -1278,7 +1279,7 @@ func (s *FragmentSyncer) SyncFragment() error {
if err != nil {
return err
}
blocks, err := client.FragmentBlocks(s.Fragment.DB(), s.Fragment.Frame(), s.Fragment.Slice())
blocks, err := client.FragmentBlocks(context.Background(), s.Fragment.DB(), s.Fragment.Frame(), s.Fragment.Slice())
if err != nil && err != ErrFragmentNotFound {
return err
}
Expand Down Expand Up @@ -1359,7 +1360,7 @@ func (s *FragmentSyncer) syncBlock(id int) error {
}
clients = append(clients, client)

bitmapIDs, profileIDs, err := client.BlockData(f.DB(), f.Frame(), f.Slice(), id)
bitmapIDs, profileIDs, err := client.BlockData(context.Background(), f.DB(), f.Frame(), f.Slice(), id)
if err != nil {
return err
}
Expand Down Expand Up @@ -1405,7 +1406,7 @@ func (s *FragmentSyncer) syncBlock(id int) error {
}

// Execute query.
_, err := clients[i].ExecuteQuery(f.DB(), buf.String(), false)
_, err := clients[i].ExecuteQuery(context.Background(), f.DB(), buf.String(), false)
if err != nil {
return err
}
Expand Down
17 changes: 9 additions & 8 deletions handler.go
@@ -1,6 +1,7 @@
package pilosa

import (
"context"
"encoding/csv"
"encoding/json"
"errors"
Expand Down Expand Up @@ -31,7 +32,7 @@ type Handler struct {

// The execution engine for running queries.
Executor interface {
Execute(db string, query *pql.Query, slices []uint64, opt *ExecOptions) ([]interface{}, error)
Execute(context context.Context, db string, query *pql.Query, slices []uint64, opt *ExecOptions) ([]interface{}, error)
}

// The version to report on the /version endpoint.
Expand Down Expand Up @@ -215,7 +216,7 @@ func (h *Handler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
}

// Execute the query.
results, err := h.Executor.Execute(req.DB, q, req.Slices, opt)
results, err := h.Executor.Execute(r.Context(), req.DB, q, req.Slices, opt)
resp := &QueryResponse{Results: results, Err: err}

// Fill profile attributes if requested.
Expand Down Expand Up @@ -570,7 +571,7 @@ func (h *Handler) handlePostImport(w http.ResponseWriter, r *http.Request) {
}

// Find the correct fragment.
h.logger().Println("Go Import:", db, frame, slice)
h.logger().Println("importing:", db, frame, slice)
f, err := h.Index.CreateFragmentIfNotExists(db, frame, slice)
if err != nil {
h.logger().Printf("fragment error: db=%s, frame=%s, slice=%d, err=%s", db, frame, slice, err)
Expand Down Expand Up @@ -815,7 +816,7 @@ func (h *Handler) handlePostFrameRestore(w http.ResponseWriter, r *http.Request)
}

// Determine the maximum number of slices.
sliceN, err := client.SliceN()
sliceN, err := client.SliceN(r.Context())
if err != nil {
http.Error(w, "cannot determine remote slice count: "+err.Error(), http.StatusInternalServerError)
return
Expand All @@ -836,18 +837,18 @@ func (h *Handler) handlePostFrameRestore(w http.ResponseWriter, r *http.Request)
}

// Stream backup from remote node.
r, err := client.BackupSlice(db, frame, slice)
rd, err := client.BackupSlice(r.Context(), db, frame, slice)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else if r == nil {
} else if rd == nil {
continue // slice doesn't exist
}

// Restore to local frame and always close reader.
if err := func() error {
defer r.Close()
if _, err := f.ReadFrom(r); err != nil {
defer rd.Close()
if _, err := f.ReadFrom(rd); err != nil {
return err
}
return nil
Expand Down