Skip to content
This repository has been archived by the owner on May 22, 2020. It is now read-only.

Get BulkOp implementation #61

Merged
merged 8 commits into from
Oct 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
test:
testing:
PKG_TEST=testing go test ./...
44 changes: 44 additions & 0 deletions bulk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package bucket

import (
"context"
"reflect"

"github.com/couchbase/gocb"
)

func (h *Handler) GetBulk(ctx context.Context, hits []gocb.SearchResultHit, container interface{}) error {
var items []gocb.BulkOp
rv := reflect.ValueOf(container)
if rv.Type().Kind() != reflect.Ptr {
return ErrInvalidBulkContainer
}

rvElem := rv.Elem()
switch rvElem.Kind() {
case reflect.Slice:
if rvElem.Len() != len(hits) {
return ErrInvalidBulkContainer
}
for i := 0; i < rvElem.Len(); i++ {
typs, err := getDocumentTypes(rvElem.Index(i).Addr().Interface())
if err != nil {
return err
}
items = append(items, &gocb.GetOp{Key: hits[i].Id, Value: rvElem.Index(i).Addr().Interface()})
identifier := h.state.fetchDocumentIdentifier(hits[i].Id)
addressableFields := getStructAddressableSubfields(rvElem.Index(i).Addr())
for _, typ := range typs {
documentKey, err := h.state.getDocumentKey(typ, identifier)
if err != nil {
return err
}
items = append(items, &gocb.GetOp{Key: documentKey, Value: addressableFields[typ]})
}
}
default:
return ErrInvalidBulkContainer
}

return h.state.bucket.Do(items)
}
80 changes: 80 additions & 0 deletions bulk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package bucket

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/rs/xid"
)

func TestGetBulk(t *testing.T) {
for i := 0; i < 10; i++ {
ws := generate()
_, _, err := th.Insert(context.Background(), "webshop", xid.New().String(), ws, 0)
//_, err := th.state.bucket.Insert("order::"+order.Token, order, 0)
if err != nil {
t.Fatal(err)
}
}

if err := createFullTextSearchIndex("webshop_fts_idx", false); err != nil {
t.Fatal(err)
}

searchMatch := "processed"
res, err := th.SimpleSearch(context.Background(), "webshop_fts_idx", &SearchQuery{
Query: searchMatch,
})
if err != nil {
t.Fatal(err)
}

var ws = make([]webshop, len(res))
err = th.GetBulk(context.Background(), res, &ws)
if err != nil {
t.Error(err)
}
if len(ws) > 0 {
assert.Equal(t, "processed", ws[0].Status)
assert.Equal(t, "Free shipping", ws[0].ShippingMethod)
assert.Equal(t, "active", ws[0].Product.Status)
assert.Equal(t, "productshop", ws[0].Store.Name)
} else {
t.Errorf("Empty resultset of the search")
}
}

func BenchmarkGetBulk(b *testing.B) {
b.StopTimer()
for i := 0; i < 10; i++ {
ws := generate()
_, _, err := th.Insert(context.Background(), "webshop", xid.New().String(), ws, 0)
//_, err := th.state.bucket.Insert("order::"+order.Token, order, 0)
if err != nil {
b.Fatal(err)
}
}

if err := createFullTextSearchIndex("webshop_fts_idx", false); err != nil {
b.Fatal(err)
}

b.StartTimer()
for i := 0; i < b.N; i++ {
searchMatch := "processed"
res, err := th.SimpleSearch(context.Background(), "webshop_fts_idx", &SearchQuery{
Query: searchMatch,
})
if err != nil {
b.Fatal(err)
}

var ws = make([]webshop, len(res))
err = th.GetBulk(context.Background(), res, &ws)
if err != nil {
b.Error(err)
}
}
}
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ var (

ErrFirstParameterNotStruct = errors.New("first parameter is not a struct")
ErrInputStructPointer = errors.New("input struct must be pointer")

ErrInvalidBulkContainer = errors.New("container must be *[]T, with length of ids array")
ErrInvalidGetDocumentTypesParam = errors.New("internal error: value should be pointer for getDocumentTypes")
)
52 changes: 37 additions & 15 deletions fts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bucket
import (
"context"
"fmt"
"reflect"
"time"

"github.com/couchbase/gocb"
Expand Down Expand Up @@ -78,8 +79,11 @@ func (h *Handler) SimpleSearch(ctx context.Context, index string, q *SearchQuery
}

query := gocb.NewSearchQuery(index, q).Limit(q.Limit).Skip(q.Offset)
hits, _, err := h.doSearch(ctx, query)
return hits, err
status, result, _, err := h.doSearch(ctx, query)
if status.Errors != nil && !reflect.ValueOf(status.Errors).IsNil() {
return nil, fmt.Errorf("%+v", status.Errors)
}
return result, err
}

func (h *Handler) SimpleSearchWithFacets(ctx context.Context, index string, q *SearchQuery, facets []FacetDef) ([]gocb.SearchResultHit, map[string]gocb.SearchResultFacet, error) {
Expand All @@ -93,8 +97,11 @@ func (h *Handler) SimpleSearchWithFacets(ctx context.Context, index string, q *S

query := gocb.NewSearchQuery(index, q).Limit(q.Limit).Skip(q.Offset)
h.addFacets(ctx, query, facets)

return h.doSearch(ctx, query)
status, result, facetResult, err := h.doSearch(ctx, query)
if status.Errors != nil && !reflect.ValueOf(status.Errors).IsNil() {
return nil, nil, fmt.Errorf("%+v", status.Errors)
}
return result, facetResult, err
}

func (h *Handler) CompoundSearch(ctx context.Context, index string, q *CompoundQueries) ([]gocb.SearchResultHit, error) {
Expand All @@ -107,7 +114,10 @@ func (h *Handler) CompoundSearch(ctx context.Context, index string, q *CompoundQ
}

query := gocb.NewSearchQuery(index, q).Limit(q.Limit).Skip(q.Offset)
result, _, err := h.doSearch(ctx, query)
status, result, _, err := h.doSearch(ctx, query)
if status.Errors != nil && !reflect.ValueOf(status.Errors).IsNil() {
return nil, fmt.Errorf("%+v", status.Errors)
}
return result, err
}

Expand All @@ -122,7 +132,10 @@ func (h *Handler) CompoundSearchWithFacets(ctx context.Context, index string, q

query := gocb.NewSearchQuery(index, q).Limit(q.Limit).Skip(q.Offset)
h.addFacets(ctx, query, facets)
result, facetResult, err := h.doSearch(ctx, query)
status, result, facetResult, err := h.doSearch(ctx, query)
if status.Errors != nil && !reflect.ValueOf(status.Errors).IsNil() {
return nil, nil, fmt.Errorf("%+v", status.Errors)
}
return result, facetResult, err
}

Expand All @@ -136,7 +149,10 @@ func (h *Handler) RangeSearch(ctx context.Context, index string, q *RangeQuery)
}

query := gocb.NewSearchQuery(index, q).Limit(q.Limit).Skip(q.Offset)
result, _, err := h.doSearch(ctx, query)
status, result, _, err := h.doSearch(ctx, query)
if status.Errors != nil && !reflect.ValueOf(status.Errors).IsNil() {
return nil, fmt.Errorf("%+v", status.Errors)
}
return result, err
}

Expand All @@ -151,21 +167,27 @@ func (h *Handler) RangeSearchWithFacets(ctx context.Context, index string, q *Ra

query := gocb.NewSearchQuery(index, q).Limit(q.Limit).Skip(q.Offset)
h.addFacets(ctx, query, facets)
result, facetResult, err := h.doSearch(ctx, query)
status, result, facetResult, err := h.doSearch(ctx, query)
if status.Errors != nil && !reflect.ValueOf(status.Errors).IsNil() {
return nil, nil, fmt.Errorf("%+v", status.Errors)
}
return result, facetResult, err
}

func (h *Handler) doSearch(ctx context.Context, query *gocb.SearchQuery) ([]gocb.SearchResultHit, map[string]gocb.SearchResultFacet, error) {
func (h *Handler) doSearch(ctx context.Context, query *gocb.SearchQuery) (gocb.SearchResultStatus, []gocb.SearchResultHit, map[string]gocb.SearchResultFacet, error) {
res, err := h.state.bucket.ExecuteSearchQuery(query)
if err != nil {
return nil, nil, err
}
fmt.Printf("%+v\n", res.Status())
for i, v := range res.Hits() {
fmt.Printf("%d ---- %+v\n", i, v)
if res != nil {
return res.Status(), nil, nil, err
}
return gocb.SearchResultStatus{}, nil, nil, err
}
//fmt.Printf("%+v\n", res.Status())
//for i, v := range res.Hits() {
// fmt.Printf("%d ---- %+v\n", i, v)
//}

return res.Hits(), res.Facets(), nil
return res.Status(), res.Hits(), res.Facets(), nil
}

func (h *Handler) addFacets(ctx context.Context, query *gocb.SearchQuery, facets []FacetDef) {
Expand Down
22 changes: 1 addition & 21 deletions fts_index_test.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,11 @@
package bucket

import (
"context"
"testing"
)

func TestCreateFullTextSearchIndex(t *testing.T) {
indexName := "order_fts_idx"

if ok, _, _ := th.InspectFullTextSearchIndex(context.Background(), indexName); ok {
err := th.DeleteFullTextSearchIndex(context.Background(), indexName)
if err != nil {
t.Fatal(err)
}
}

def, err := DefaultFullTextSearchIndexDefinition(IndexMeta{
Name: indexName,
SourceType: "couchbase",
SourceName: "company",
DocIDPrefixDelimiter: "::",
})
if err != nil {
t.Fatal(err)
}
err = th.CreateFullTextSearchIndex(context.Background(), def)
if err != nil {
if err := createFullTextSearchIndex("order_fts_idx", true); err != nil {
t.Fatal(err)
}
}
Loading