Skip to content

Commit

Permalink
add combined wildcard feature search
Browse files Browse the repository at this point in the history
  • Loading branch information
IngoRoessner committed Dec 8, 2023
1 parent b6e4c72 commit b781949
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 44 deletions.
2 changes: 2 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
"http_server_timeout": "30s",
"http_server_read_timeout": "3s",

"enable_combined_wildcard_feature_search": true,

"resources": {
"smart_service_releases": {
"features":[
Expand Down
13 changes: 7 additions & 6 deletions lib/configuration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ type ConfigStruct struct {

ResultModifiers map[string][]ResultModifier `json:"result_modifiers"`

Timeout string `json:"timeout"`
MaxRetry int `json:"max_retry"`
BulkFlushInterval string `json:"bulk_flush_interval"`
IndexTypeMapping map[string]map[string]map[string]interface{} `json:"index_type_mapping"`
BulkWorkerCount int64 `json:"bulk_worker_count"`
UseBulkWorkerForAnnotations bool `json:"use_bulk_worker_for_annotations"`
Timeout string `json:"timeout"`
MaxRetry int `json:"max_retry"`
BulkFlushInterval string `json:"bulk_flush_interval"`
IndexTypeMapping map[string]map[string]map[string]interface{} `json:"index_type_mapping"`
BulkWorkerCount int64 `json:"bulk_worker_count"`
UseBulkWorkerForAnnotations bool `json:"use_bulk_worker_for_annotations"`
EnableCombinedWildcardFeatureSearch bool `json:"enable_combined_wildcard_feature_search"`

JwtPubRsa string `json:"jwt_pub_rsa"`
ForceUser string `json:"force_user"`
Expand Down
63 changes: 35 additions & 28 deletions lib/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,17 +669,12 @@ func (this *Query) SearchRightsToAdministrate(kind string, user string, groups [
}
ctx := this.getTimeout()

searchOperation, searchConfig := this.getFeatureSearchInfo(query)
body := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"filter": getRightsQuery("a", user, groups),
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
"feature_search": map[string]interface{}{"operator": "AND", "query": query},
},
},
},
"must": []map[string]interface{}{{searchOperation: searchConfig}},
},
},
}
Expand Down Expand Up @@ -825,6 +820,33 @@ func (this *Query) selectByField(kind string, field string, value string, user s
return
}

func (this *Query) getFeatureSearchInfo(query string) (operator string, config map[string]interface{}) {
if strings.Contains(query, "*") {
return "wildcard", map[string]interface{}{
"feature_search": map[string]interface{}{"case_insensitive": true, "value": query},
}
}
if !this.config.EnableCombinedWildcardFeatureSearch || strings.ContainsAny(query, " -/_:,;([{&%$") {
return "match", map[string]interface{}{
"feature_search": map[string]interface{}{"operator": "AND", "query": query},
}
}
return "bool", map[string]interface{}{
"should": []map[string]interface{}{
{
"wildcard": map[string]interface{}{
"feature_search": map[string]interface{}{"case_insensitive": true, "value": "*" + query + "*"},
},
},
{
"match": map[string]interface{}{
"feature_search": map[string]interface{}{"operator": "AND", "query": query},
},
},
},
}
}

// SearchList does a text search with query on the feature_search index
// the function allows optionally additional filtering with the selection parameter. when unneeded this parameter may be nil.
func (this *Query) SearchList(token auth.Token, kind string, query string, queryCommons model.QueryListCommons, selection *model.Selection) (result []map[string]interface{}, err error) {
Expand All @@ -836,17 +858,12 @@ func (this *Query) SearchList(token auth.Token, kind string, query string, query
}
filter = append(filter, selectionFilter)
}
searchOperation, searchConfig := this.getFeatureSearchInfo(query)
body := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"filter": filter,
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
"feature_search": map[string]interface{}{"operator": "AND", "query": query},
},
},
},
"must": []map[string]interface{}{{searchOperation: searchConfig}},
},
},
}
Expand Down Expand Up @@ -885,17 +902,12 @@ func (this *Query) SearchList(token auth.Token, kind string, query string, query
func (this *Query) searchList(kind string, query string, user string, groups []string, rights string, limit int, offset int) (result []map[string]interface{}, err error) {
ctx := this.getTimeout()

searchOperation, searchConfig := this.getFeatureSearchInfo(query)
body := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"filter": getRightsQuery(rights, user, groups),
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
"feature_search": map[string]interface{}{"operator": "AND", "query": query},
},
},
},
"must": []map[string]interface{}{{searchOperation: searchConfig}},
},
},
}
Expand Down Expand Up @@ -929,17 +941,12 @@ func (this *Query) searchList(kind string, query string, user string, groups []s
func (this *Query) SearchOrderedList(kind string, query string, user string, groups []string, queryCommons model.QueryListCommons) (result []map[string]interface{}, err error) {
ctx := this.getTimeout()

searchOperation, searchConfig := this.getFeatureSearchInfo(query)
body := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"filter": getRightsQuery(queryCommons.Rights, user, groups),
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
"feature_search": map[string]interface{}{"operator": "AND", "query": query},
},
},
},
"must": []map[string]interface{}{{searchOperation: searchConfig}},
},
},
}
Expand Down
9 changes: 2 additions & 7 deletions lib/query/total.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,12 @@ func (this *Query) Total(tokenStr string, kind string, options model.ListOptions

func (this *Query) SearchListTotal(token auth.Token, kind string, query string, rights string) (result int64, err error) {
filter := getRightsQuery(rights, token.GetUserId(), token.GetRoles())
searchOperation, searchConfig := this.getFeatureSearchInfo(query)
body := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"filter": filter,
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
"feature_search": map[string]interface{}{"operator": "AND", "query": query},
},
},
},
"must": []map[string]interface{}{{searchOperation: searchConfig}},
},
},
}
Expand Down
179 changes: 176 additions & 3 deletions lib/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,107 @@ import (
"time"
)

func BenchmarkSearch(b *testing.B) {
b.Skip("benchmark")
wg := &sync.WaitGroup{}
defer wg.Wait()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

config, err := configuration.LoadConfig("./../config.json")
if err != nil {
b.Error(err)
return
}
config.LogDeprecatedCallsToFile = ""
config.FatalErrHandler = func(v ...interface{}) {
log.Println("TEST-ERROR:", v)
b.Log(v...)
}

config.OpenSearchInsecureSkipVerify = true
config.OpenSearchUsername = "admin"
config.OpenSearchPassword = "admin"

b.Run("start dependency containers", func(b *testing.B) {
_, ip, err := OpenSearch(ctx, wg)
if err != nil {
b.Error(err)
return
}
config.OpenSearchUrls = "https://" + ip + ":9200"

_, zkIp, err := Zookeeper(ctx, wg)
if err != nil {
b.Error(err)
return
}
config.KafkaUrl = zkIp + ":2181"

//kafka
config.KafkaUrl, err = Kafka(ctx, wg, config.KafkaUrl)
if err != nil {
b.Error(err)
return
}
})

b.Run("start server", func(b *testing.B) {
freePort, err := GetFreePort()
if err != nil {
b.Error(err)
return
}
config.ServerPort = strconv.Itoa(freePort)
err = Start(ctx, config, Standalone)
if err != nil {
b.Error(err)
return
}
})

deviceNames := []string{"Plug Kühlschrank Backofen ", "HEAT_COST_ALLOCATOR", "HEAT-COST-ALLOCATOR", "HEAT COST ALLOCATOR", "HeatCostAllocator", "foo", "cator", "heal", "heat"}
/*
for i := len(deviceNames); i < 1000; i++ {
deviceNames = append(deviceNames, uuid.NewString())
}
*/

b.Run("create devices", createSearchTestDevicesForBenchmark(ctx, config, deviceNames...))

time.Sleep(10 * time.Second) //kafka latency

b.ResetTimer()

for n := 0; n < b.N; n++ {
b.Run("check cost", checkDeviceSearchForBenchmark(config, "cost"))
b.Run("check COST", checkDeviceSearchForBenchmark(config, "COST"))
b.Run("check HEAT", checkDeviceSearchForBenchmark(config, "HEAT"))
b.Run("check heat", checkDeviceSearchForBenchmark(config, "heat"))

b.Run("check HEAT_COST", checkDeviceSearchForBenchmark(config, "HEAT_COST"))
b.Run("check HeatCost", checkDeviceSearchForBenchmark(config, "HeatCost"))
b.Run("check COST_ALLOCATOR", checkDeviceSearchForBenchmark(config, "COST_ALLOCATOR"))
b.Run("check CostAllocator", checkDeviceSearchForBenchmark(config, "CostAllocator"))

b.Run("check HEAT-COST", checkDeviceSearchForBenchmark(config, "HEAT-COST"))
b.Run("check Heat-Cost", checkDeviceSearchForBenchmark(config, "Heat-Cost"))
b.Run("check COST-ALLOCATOR", checkDeviceSearchForBenchmark(config, "COST-ALLOCATOR"))
b.Run("check Cost-Allocator", checkDeviceSearchForBenchmark(config, "Cost-Allocator"))

b.Run("check Allo", checkDeviceSearchForBenchmark(config, "Allo"))

b.Run("check Hea", checkDeviceSearchForBenchmark(config, "Hea"))

b.Run("check küh", checkDeviceSearchForBenchmark(config, "küh"))
b.Run("check back", checkDeviceSearchForBenchmark(config, "back"))

b.Run("check CATOR", checkDeviceSearchForBenchmark(config, "CATOR"))
b.Run("check cator", checkDeviceSearchForBenchmark(config, "cator"))
}

}

func TestSearch(t *testing.T) {
if testing.Short() {
t.Skip("short")
Expand Down Expand Up @@ -116,10 +217,14 @@ func TestSearch(t *testing.T) {
t.Run("check Cost-Allocator", checkDeviceSearch(config, "Cost-Allocator", "HEAT_COST_ALLOCATOR", "HEAT-COST-ALLOCATOR", "HEAT COST ALLOCATOR", "HeatCostAllocator"))

t.Run("check Allo", checkDeviceSearch(config, "Allo", "HEAT_COST_ALLOCATOR", "HEAT-COST-ALLOCATOR", "HEAT COST ALLOCATOR", "HeatCostAllocator"))

t.Run("check Hea", checkDeviceSearch(config, "Hea", "HEAT_COST_ALLOCATOR", "HEAT-COST-ALLOCATOR", "HEAT COST ALLOCATOR", "HeatCostAllocator", "heal", "heat"))

t.Run("check küh", checkDeviceSearch(config, "küh", "Plug Kühlschrank Backofen "))
t.Run("check back", checkDeviceSearch(config, "back", "Plug Kühlschrank Backofen "))

t.Run("check CATOR", checkDeviceSearch(config, "CATOR", "HEAT_COST_ALLOCATOR", "HEAT-COST-ALLOCATOR", "HEAT COST ALLOCATOR", "HeatCostAllocator", "cator"))
t.Run("check cator", checkDeviceSearch(config, "cator", "HEAT_COST_ALLOCATOR", "HEAT-COST-ALLOCATOR", "HEAT COST ALLOCATOR", "HeatCostAllocator", "cator"))
}

func checkDeviceSearch(config configuration.Config, searchText string, expectedResultNames ...string) func(t *testing.T) {
Expand Down Expand Up @@ -173,9 +278,47 @@ func checkDeviceSearch(config configuration.Config, searchText string, expectedR
expectedNames := append([]string{}, expectedResultNames...)
sort.Strings(expectedNames)
if !reflect.DeepEqual(expectedNames, actualNames) {
a, _ := json.Marshal(actualNames)
e, _ := json.Marshal(expectedNames)
t.Error(string(a), "\n", string(e))
t.Errorf("\n%#v\n%#v\n", actualNames, expectedNames)
}
}
}

func checkDeviceSearchForBenchmark(config configuration.Config, searchText string) func(b *testing.B) {
return func(b *testing.B) {
method := "POST"
path := "/v3/query"
body := new(bytes.Buffer)
err := json.NewEncoder(body).Encode(model.QueryMessage{
Resource: "devices",
Find: &model.QueryFind{
QueryListCommons: model.QueryListCommons{
Limit: 100,
Offset: 0,
SortBy: "name",
SortDesc: true,
},
Search: searchText,
},
})
if err != nil {
b.Error(err)
return
}
req, err := http.NewRequest(method, "http://localhost:"+config.ServerPort+path, body)
if err != nil {
b.Error(err)
return
}
req.Header.Set("Authorization", testtoken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
b.Error(err)
return
}
if resp.StatusCode != 200 {
temp, _ := io.ReadAll(resp.Body)
b.Error(resp.StatusCode, string(temp))
return
}
}
}
Expand All @@ -197,6 +340,19 @@ func createSearchTestDevices(ctx context.Context, config configuration.Config, n
}
}

func createSearchTestDevicesForBenchmark(ctx context.Context, config configuration.Config, names ...string) func(b *testing.B) {
return func(b *testing.B) {
p, err := k.NewProducer(ctx, config.KafkaUrl, "devices", true)
if err != nil {
b.Error(err)
return
}
for _, name := range names {
b.Run("create "+name, createSearchTestDeviceForBenchmark(p, name))
}
}
}

func createSearchTestDevice(p *k.Producer, name string) func(t *testing.T) {
return func(t *testing.T) {
deviceMsg, deviceCmd, err := getDeviceTestObj(uuid.New().String(), map[string]interface{}{
Expand All @@ -213,3 +369,20 @@ func createSearchTestDevice(p *k.Producer, name string) func(t *testing.T) {
}
}
}

func createSearchTestDeviceForBenchmark(p *k.Producer, name string) func(b *testing.B) {
return func(b *testing.B) {
deviceMsg, deviceCmd, err := getDeviceTestObj(uuid.New().String(), map[string]interface{}{
"name": name,
})
if err != nil {
b.Error(err)
return
}
err = p.Produce(deviceCmd.Id, deviceMsg)
if err != nil {
b.Error(err)
return
}
}
}

0 comments on commit b781949

Please sign in to comment.