Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1677 from yuce/1632-move-columnattrs
Browse files Browse the repository at this point in the history
Fixes #1632
  • Loading branch information
yuce committed Oct 5, 2018
2 parents 2d81d85 + 37d1cfb commit 396ec6e
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 72 deletions.
66 changes: 4 additions & 62 deletions api.go
Expand Up @@ -102,80 +102,22 @@ func (api *API) Query(ctx context.Context, req *QueryRequest) (QueryResponse, er
return QueryResponse{}, errors.Wrap(err, "validating api method")
}

resp := QueryResponse{}

q, err := pql.NewParser(strings.NewReader(req.Query)).Parse()
if err != nil {
return resp, errors.Wrap(err, "parsing")
return QueryResponse{}, errors.Wrap(err, "parsing")
}
execOpts := &execOptions{
Remote: req.Remote,
ExcludeRowAttrs: req.ExcludeRowAttrs, // NOTE: Kept for Pilosa 1.x compat.
ExcludeColumns: req.ExcludeColumns, // NOTE: Kept for Pilosa 1.x compat.
ColumnAttrs: req.ColumnAttrs, // NOTE: Kept for Pilosa 1.x compat.
}
results, err := api.server.executor.Execute(ctx, req.Index, q, req.Shards, execOpts)
resp, err := api.server.executor.Execute(ctx, req.Index, q, req.Shards, execOpts)
if err != nil {
return resp, errors.Wrap(err, "executing")
}
resp.Results = results

// Fill column attributes if requested.
// execOpts.ColumnAttrs may be set by the Execute method if any of the Calls use Options(columnAttrs=true)
if execOpts.ColumnAttrs {
// Consolidate all column ids across all calls.
var columnIDs []uint64
for _, result := range results {
bm, ok := result.(*Row)
if !ok {
continue
}
columnIDs = uint64Slice(columnIDs).merge(bm.Columns())
}

// Retrieve column attributes across all calls.
columnAttrSets, err := api.readColumnAttrSets(api.holder.Index(req.Index), columnIDs)
if err != nil {
return resp, errors.Wrap(err, "reading column attrs")
}

// Translate column attributes, if necessary.
if api.holder.translateFile != nil {
for _, col := range resp.ColumnAttrSets {
v, err := api.holder.translateFile.TranslateColumnToString(req.Index, col.ID)
if err != nil {
return resp, err
}
col.Key, col.ID = v, 0
}
}

resp.ColumnAttrSets = columnAttrSets
}
return resp, nil
}

// readColumnAttrSets returns a list of column attribute objects by id.
func (api *API) readColumnAttrSets(index *Index, ids []uint64) ([]*ColumnAttrSet, error) {
if index == nil {
return nil, nil
return QueryResponse{}, errors.Wrap(err, "executing")
}

ax := make([]*ColumnAttrSet, 0, len(ids))
for _, id := range ids {
// Read attributes for column. Skip column if empty.
attrs, err := index.ColumnAttrStore().Attrs(id)
if err != nil {
return nil, errors.Wrap(err, "getting attrs")
} else if len(attrs) == 0 {
continue
}

// Append column with attributes.
ax = append(ax, &ColumnAttrSet{ID: id, Attrs: attrs})
}

return ax, nil
return resp, nil
}

// CreateIndex makes a new Pilosa index.
Expand Down
77 changes: 68 additions & 9 deletions executor.go
Expand Up @@ -79,20 +79,21 @@ func newExecutor(opts ...executorOption) *executor {
}

// Execute executes a PQL query.
func (e *executor) Execute(ctx context.Context, index string, q *pql.Query, shards []uint64, opt *execOptions) ([]interface{}, error) {
func (e *executor) Execute(ctx context.Context, index string, q *pql.Query, shards []uint64, opt *execOptions) (QueryResponse, error) {
resp := QueryResponse{}
// Verify that an index is set.
if index == "" {
return nil, ErrIndexRequired
return resp, ErrIndexRequired
}

idx := e.Holder.Index(index)
if idx == nil {
return nil, ErrIndexNotFound
return resp, ErrIndexNotFound
}

// Verify that the number of writes do not exceed the maximum.
if e.MaxWritesPerRequest > 0 && q.WriteCallN() > e.MaxWritesPerRequest {
return nil, ErrTooManyWrites
return resp, ErrTooManyWrites
}

// Default options.
Expand All @@ -105,14 +106,48 @@ func (e *executor) Execute(ctx context.Context, index string, q *pql.Query, shar
if !opt.Remote {
for i := range q.Calls {
if err := e.translateCall(index, idx, q.Calls[i]); err != nil {
return nil, err
return resp, err
}
}
}

results, err := e.execute(ctx, index, q, shards, opt)
if err != nil {
return nil, err
return resp, err
}

resp.Results = results

// Fill column attributes if requested.
if opt.ColumnAttrs {
// Consolidate all column ids across all calls.
var columnIDs []uint64
for _, result := range results {
bm, ok := result.(*Row)
if !ok {
continue
}
columnIDs = uint64Slice(columnIDs).merge(bm.Columns())
}

// Retrieve column attributes across all calls.
columnAttrSets, err := e.readColumnAttrSets(e.Holder.Index(index), columnIDs)
if err != nil {
return resp, errors.Wrap(err, "reading column attrs")
}

// Translate column attributes, if necessary.
if idx.Keys() {
for _, col := range columnAttrSets {
v, err := e.Holder.translateFile.TranslateColumnToString(index, col.ID)
if err != nil {
return resp, err
}
col.Key, col.ID = v, 0
}
}

resp.ColumnAttrSets = columnAttrSets
}

// Translate response objects from ids to keys, if necessary.
Expand All @@ -121,11 +156,35 @@ func (e *executor) Execute(ctx context.Context, index string, q *pql.Query, shar
for i := range results {
results[i], err = e.translateResult(index, idx, q.Calls[i], results[i])
if err != nil {
return nil, err
return resp, err
}
}
}
return results, nil

return resp, nil
}

// readColumnAttrSets returns a list of column attribute objects by id.
func (e *executor) readColumnAttrSets(index *Index, ids []uint64) ([]*ColumnAttrSet, error) {
if index == nil {
return nil, nil
}

ax := make([]*ColumnAttrSet, 0, len(ids))
for _, id := range ids {
// Read attributes for column. Skip column if empty.
attrs, err := index.ColumnAttrStore().Attrs(id)
if err != nil {
return nil, errors.Wrap(err, "getting attrs")
} else if len(attrs) == 0 {
continue
}

// Append column with attributes.
ax = append(ax, &ColumnAttrSet{ID: id, Attrs: attrs})
}

return ax, nil
}

func (e *executor) execute(ctx context.Context, index string, q *pql.Query, shards []uint64, opt *execOptions) ([]interface{}, error) {
Expand Down Expand Up @@ -1799,7 +1858,7 @@ func (e *executor) mapperLocal(ctx context.Context, shards []uint64, mapFn mapFu
func (e *executor) translateCall(index string, idx *Index, c *pql.Call) error {
var colKey, rowKey, fieldName string
switch c.Name {
case "Set", "Clear", "Row", "Range":
case "Set", "Clear", "Row", "Range", "SetColumnAttrs":
// Positional args in new PQL syntax require special handling here.
colKey = "_" + columnLabel
fieldName, _ = c.FieldArg()
Expand Down
30 changes: 30 additions & 0 deletions executor_test.go
Expand Up @@ -1561,6 +1561,36 @@ func TestExecutor_QueryCall(t *testing.T) {
}
})

t.Run("columnAttrsWithKeys", func(t *testing.T) {
c := test.MustRunCluster(t, 1)
defer c.Close()
hldr := test.Holder{Holder: c[0].Server.Holder()}

// Set columns for rows 0, 10, & 20 across two shards.
if idx, err := hldr.CreateIndex("i", pilosa.IndexOptions{Keys: true}); err != nil {
t.Fatal(err)
} else if _, err := idx.CreateField("f", pilosa.OptFieldKeys()); err != nil {
t.Fatal(err)
} else if _, err := c[0].API.Query(context.Background(), &pilosa.QueryRequest{Index: "i", Query: `
Set("one-hundred", f="ten")
SetColumnAttrs("one-hundred", foo="bar")
`}); err != nil {
t.Fatal(err)
}

targetColAttrSets := []*pilosa.ColumnAttrSet{
{Key: "one-hundred", Attrs: map[string]interface{}{"foo": "bar"}},
}

if res, err := c[0].API.Query(context.Background(), &pilosa.QueryRequest{Index: "i", Query: `Options(Row(f="ten"), columnAttrs=true)`}); err != nil {
t.Fatal(err)
} else if keys := res.Results[0].(*pilosa.Row).Keys; !reflect.DeepEqual(keys, []string{"one-hundred"}) {
t.Fatalf("unexpected keys: %+v", keys)
} else if attrs := res.ColumnAttrSets; !reflect.DeepEqual(attrs, targetColAttrSets) {
t.Fatalf("unexpected attrs: %s", spew.Sdump(attrs))
}
})

t.Run("shards", func(t *testing.T) {
c := test.MustRunCluster(t, 1)
defer c.Close()
Expand Down
2 changes: 1 addition & 1 deletion pilosa.go
Expand Up @@ -117,7 +117,7 @@ var nameRegexp = regexp.MustCompile(`^[a-z][a-z0-9_-]{0,63}$`)
// ColumnAttrSet represents a set of attributes for a vertical column in an index.
// Can have a set of attributes attached to it.
type ColumnAttrSet struct {
ID uint64 `json:"id"`
ID uint64 `json:"id,omitempty"`
Key string `json:"key,omitempty"`
Attrs map[string]interface{} `json:"attrs,omitempty"`
}
Expand Down

0 comments on commit 396ec6e

Please sign in to comment.