Skip to content

Commit

Permalink
several enhancement
Browse files Browse the repository at this point in the history
  • Loading branch information
nl5887 committed Jan 9, 2018
1 parent c012847 commit 21db55f
Show file tree
Hide file tree
Showing 708 changed files with 21,399 additions and 12,609 deletions.
4 changes: 2 additions & 2 deletions build.sh
@@ -1,3 +1,3 @@
LDFLAGS="$(go run scripts/gen-ldflags.go)"
env GOOS=linux GOARCH=amd64 go build -o ./bin/marija-linux-amd64
LDFLAGS="$(go run -exec ~/.gopath/bin/sign-wrapper.sh scripts/gen-ldflags.go)"
env GOOS=linux GOARCH=amd64 go build -ldflags "$LDFLAGS" -o ./bin/marija-linux-amd64

42 changes: 33 additions & 9 deletions server/conn.go
Expand Up @@ -46,24 +46,31 @@ const (

InitialStateReceive = "INITIAL_STATE_RECEIVE"

ActionTypeSearchRequest = "SEARCH_REQUEST"
ActionTypeSearchReceive = "SEARCH_RECEIVE"
ActionTypeSearchCanceled = "SEARCH_CANCELED"
ActionTypeSearchCompleted = "SEARCH_COMPLETED"

ActionTypeItemsRequest = "ITEMS_REQUEST"
ActionTypeItemsReceive = "ITEMS_RECEIVE"

ActionTypeIndicesRequest = "INDICES_REQUEST"
ActionTypeIndicesReceive = "INDICES_RECEIVE"

ActionTypeFieldsRequest = "FIELDS_REQUEST"
ActionTypeFieldsReceive = "FIELDS_RECEIVE"
ActionTypeGetFieldsRequest = "FIELDS_REQUEST"
ActionTypeGetFieldsReceive = "FIELDS_RECEIVE"
)

type connection struct {
ws *websocket.Conn
send chan json.Marshaler
b int
server *Server
closed bool
}

func (c *connection) Send(v json.Marshaler) {
if c.closed {
return
}

c.send <- v
}

Expand Down Expand Up @@ -110,10 +117,11 @@ func (c *connection) readPump() {
}

switch r.Type {
case ActionTypeItemsRequest:
case ActionTypeSearchRequest:
r := SearchRequest{}
if err := json.Unmarshal(data, &r); err != nil {
log.Error("Error occured during search: %s", err.Error())

c.Send(&ErrorMessage{
RequestID: r.RequestID,
Message: err.Error(),
Expand All @@ -126,7 +134,23 @@ func (c *connection) readPump() {
Message: err.Error(),
})
}
case ActionTypeFieldsRequest:
case ActionTypeItemsRequest:
r := ItemsRequest{}
if err := json.Unmarshal(data, &r); err != nil {
log.Error("Error occured retrieving items: %s", err.Error())
c.Send(&ErrorMessage{
RequestID: r.RequestID,
Message: err.Error(),
})
} else if err := c.Items(ctx, r); err != nil {
log.Error("Error occured retrieving items: %s", err.Error())

c.Send(&ErrorMessage{
RequestID: r.RequestID,
Message: err.Error(),
})
}
case ActionTypeGetFieldsRequest:
r := GetFieldsRequest{}
if err := json.Unmarshal(data, &r); err != nil {
log.Error("Error occured during search: %s", err.Error())
Expand Down Expand Up @@ -163,6 +187,7 @@ func (c *connection) writePump() {
select {
case message, ok := <-c.send:
if !ok {
c.closed = true
c.write(websocket.CloseMessage, []byte{})
return
}
Expand Down Expand Up @@ -201,9 +226,8 @@ func (s *Server) serveWs(w http.ResponseWriter, r *http.Request) {
h.register <- c

log.Info("Connection upgraded.")
defer log.Info("Connection closed")

go c.writePump()
c.readPump()

log.Info("Connection closed")
}
61 changes: 23 additions & 38 deletions server/datasources/es5/elasticsearchindexv5.go
Expand Up @@ -109,19 +109,6 @@ func (i *Elasticsearch) Search(ctx context.Context, so datasources.SearchOptions
defer close(itemCh)
defer close(errorCh)

log.Debug(so.Query)

/*
if i.cache == nil {
i.cache = cache.New(5*time.Minute, 10*time.Minute)
} else if v, ok := i.cache.Get(so.Query); !ok {
} else if items, ok := v.([]datasources.Item); !ok {
} else {
itemCh <- items
return
}
*/

hl := elastic.NewHighlight()
hl = hl.Fields(elastic.NewHighlighterField("*").RequireFieldMatch(false).NumOfFragments(0))
hl = hl.PreTags("<em>").PostTags("</em>")
Expand All @@ -141,10 +128,11 @@ func (i *Elasticsearch) Search(ctx context.Context, so datasources.SearchOptions

src := elastic.NewSearchSource().
Query(q).
FetchSource(true).
FetchSource(false).
Highlight(hl).
FetchSource(true).
From(so.From).
Size(100) // so.Size)
Size(100)

if len(scriptFields) > 0 {
src = src.ScriptFields(scriptFields...)
Expand All @@ -157,7 +145,7 @@ func (i *Elasticsearch) Search(ctx context.Context, so datasources.SearchOptions

scroll := i.client.Scroll().Index(index).SearchSource(src)
for {
results, err := scroll.Do()
results, err := scroll.Do(ctx)
if err == io.EOF {
return
} else if err != nil {
Expand Down Expand Up @@ -235,45 +223,42 @@ func flatten(root string, m map[string]interface{}) (fields []datasources.Field)
}
}

fields = append(fields, datasources.Field{
Path: "src-ip_dst-net_port",
Type: "string",
})

fields = append(fields, datasources.Field{
Path: "src-ip_dst-ip_port",
Type: "string",
})

return
}

func (i *Elasticsearch) Fields() (fields []datasources.Field, err error) {
func (i *Elasticsearch) GetFields(ctx context.Context) (fields []datasources.Field, err error) {
index := path.Base(i.URL.Path)
log.Debug("Using index: ", path.Base(i.URL.Path))

exists, err := i.client.IndexExists(index).Do(ctx)
if err != nil {
return nil, err
} else if !exists {
return nil, fmt.Errorf("Index %s doesn't exist", index)
}

mapping, err := i.client.GetMapping().
Index(index).
Do()
Do(ctx)

if err != nil {
return nil, fmt.Errorf("Error retrieving fields for index: %s: %s", index, err.Error())
}

for key := range mapping {
fmt.Printf(key)
}

if mapping[index] == nil {
return nil, fmt.Errorf("Error retrieving fields for index: %s: %s", index, "")
}

mapping = mapping[index].(map[string]interface{})
mapping = mapping["mappings"].(map[string]interface{})
for _, v := range mapping {
// types
fields = append(fields, flatten("", v.(map[string]interface{}))...)
}

fields = append(fields, datasources.Field{
Path: "src-ip_dst-net_port",
Type: "string",
})

fields = append(fields, datasources.Field{
Path: "src-ip_dst-ip_port",
Type: "string",
})

return
}
3 changes: 2 additions & 1 deletion server/datasources/index.go
Expand Up @@ -4,5 +4,6 @@ import "context"

type Index interface {
Search(context.Context, SearchOptions) SearchResponse
Fields() ([]Field, error)
// Items(context.Context, ItemsOptions) ItemsResponse
GetFields(context.Context) ([]Field, error)
}
1 change: 1 addition & 0 deletions server/datasources/item.go
Expand Up @@ -4,4 +4,5 @@ type Item struct {
ID string `json:"id"`
Fields map[string]interface{} `json:"fields"`
Highlight map[string][]string `json:"highlight"`
Count int `json:"count"`
}
7 changes: 3 additions & 4 deletions server/getfields.go
Expand Up @@ -14,14 +14,13 @@ import (
func (c *connection) GetFields(ctx context.Context, r GetFieldsRequest) error {
for _, server := range r.Datasources {
var datasource datasources.Index
if d, ok := c.server.Datasources[server]; !ok {
datasource, ok := c.server.Datasources[server]
if !ok {
log.Errorf("Could not find datasource: %s", server)
continue
} else {
datasource = d
}

fields, err := datasource.Fields()
fields, err := datasource.GetFields(ctx)
if err != nil {
return err
}
Expand Down
120 changes: 120 additions & 0 deletions server/items.go
@@ -0,0 +1,120 @@
package server

import (
"context"
_ "log"

_ "github.com/dutchcoders/marija/server/datasources/blockchain"
_ "github.com/dutchcoders/marija/server/datasources/es5"
_ "github.com/dutchcoders/marija/server/datasources/twitter"
)

func (c *connection) Items(ctx context.Context, r ItemsRequest) error {
c.Send(&ItemsResponse{
RequestID: r.RequestID,
})

/*
indexes := r.Datasources
for _, index := range indexes {
datasource, ok := c.server.Datasources[index]
if !ok {
c.Send(&ErrorMessage{
RequestID: r.RequestID,
Message: fmt.Sprintf("Could not find datasource: %s", index),
})
log.Errorf("Could not find datasource: %s", index)
continue
}
go func() {
response := datasource.Search(ctx, datasources.SearchOptions{
Query: r.Query,
})
unique := Unique{}
items := []datasources.Item{}
for {
select {
case <-ctx.Done():
return
case err, ok := <-response.Error():
if !ok {
return
}
log.Error("Error: ", err.Error())
c.Send(&ErrorMessage{
RequestID: r.RequestID,
Message: err.Error(),
})
case item, ok := <-response.Item():
if !ok {
c.Send(&SearchResponse{
RequestID: r.RequestID,
Query: r.Query,
Nodes: items,
})
return
}
// filter fields
values := map[string]interface{}{}
for _, field := range r.Fields {
v, ok := item.Fields[field]
if !ok {
continue
}
values[field] = v
}
// calculate hash of fields
h := fnv.New128()
for _, field := range values {
switch s := field.(type) {
case string:
h.Write([]byte(s))
default:
}
}
hash := h.Sum(nil)
if unique.Contains(hash) {
continue
}
unique.Add(hash)
items = append(items, datasources.Item{
Fields: values,
})
if len(items) < 20 {
continue
}
case <-time.After(time.Second * 5):
}
c.Send(&SearchResponse{
RequestID: r.RequestID,
Query: r.Query,
Nodes: items,
})
items = []datasources.Item{}
}
}()
}
*/

return nil
}

0 comments on commit 21db55f

Please sign in to comment.