Permalink
Browse files

added index and field discovery to backend server, extracted some thi…

…ngs into their own functions
  • Loading branch information...
Raz0rwire committed Nov 1, 2016
1 parent f5555b1 commit 1213f80fadac587a2c930c1caf9874afac4abf1f
Showing with 123 additions and 104 deletions.
  1. +2 −0 .gitignore
  2. +120 −87 conn.go
  3. +1 −17 main.go
View
@@ -0,0 +1,2 @@
.idea
marija-server
View
207 conn.go
@@ -7,50 +7,138 @@ package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"path"
"time"
"gopkg.in/olivere/elastic.v3"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = 1 * time.Second
// Maximum message size allowed from peer.
maxMessageSize = 512
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// connection is an middleman between the websocket connection and the hub.
type connection struct {
// The websocket connection.
ws *websocket.Conn
// Buffered channel of outbound messages.
ws *websocket.Conn
send chan interface{}
}
b int
func (c *connection) reportError(event map[string]interface{}, error error) {
log.Println(error.Error())
c.send <- map[string]interface{}{
"error": map[string]interface{}{
"query": event["query"].(string),
"color": event["color"].(string),
"message": error.Error(),
},
}
}
func (c *connection) connectToEs(event map[string]interface{}) (esClient *elastic.Client, url *url.URL) {
index := event["index"].(string)
url, parseError := url.Parse(index)
if parseError != nil {
return
}
esClient, connectionError := elastic.NewClient(elastic.SetURL(url.Host), elastic.SetSniff(false))
if connectionError != nil {
c.reportError(event, connectionError)
return
}
return esClient, url
}
func (c *connection) search(event map[string]interface{}) {
es, url := c.connectToEs(event)
hl := elastic.NewHighlight()
hl = hl.Fields(elastic.NewHighlighterField("_all").NumOfFragments(0))
hl = hl.PreTags("<em>").PostTags("</em>")
results, err := es.Search().
Index(path.Base(url.Path)).
Highlight(hl).
Query(elastic.NewQueryStringQuery(event["query"].(string))).
From(0).Size(200).
Do()
if err != nil {
c.reportError(event, err)
return
}
c.send <- map[string]interface{}{
"type": "search",
"hits": map[string]interface{}{
"query": event["query"].(string),
"color": event["color"].(string),
"results": results,
},
}
}
func (c *connection) discoverIndices(event map[string]interface{}) {
es, _ := c.connectToEs(event)
results, err := es.IndexStats().
Metric("index").
Do()
if err != nil {
c.reportError(event, err)
return
}
c.send <- map[string]interface{}{
"type": "index_discovery",
"hits": map[string]interface{}{
"server": event["server"].(string),
"indices": results.Indices,
},
}
}
func (c *connection) discoverFields(event map[string]interface{}) {
es, url := c.connectToEs(event)
results, err := es.FieldStats().
Index(path.Base(url.Path)).
Do()
if err != nil {
c.reportError(event, err)
return
}
c.send <- map[string]interface{}{
"type": "field_discovery",
"hits": map[string]interface{}{
"server": event["server"].(string),
"index": event["index"].(string),
"fields": results.Indices,
},
}
}
// readPump pumps messages from the websocket connection to the hub.
func (c *connection) readPump() {
defer func() {
h.unregister <- c
@@ -59,7 +147,9 @@ func (c *connection) readPump() {
c.ws.SetReadLimit(maxMessageSize)
c.ws.SetReadDeadline(time.Now().Add(pongWait))
c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
c.ws.SetPongHandler(func(string) error {
c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil
})
for {
_, message, err := c.ws.ReadMessage()
@@ -70,80 +160,24 @@ func (c *connection) readPump() {
break
}
// check type of message
// mapping
// query
fmt.Println(string(message))
v := map[string]interface{}{}
if err := json.NewDecoder(bytes.NewBuffer(message)).Decode(&v); err != nil {
c.send <- map[string]interface{}{
"error": map[string]interface{}{
"query": v["query"].(string),
"color": v["color"].(string),
"message": err.Error(),
},
}
event := map[string]interface{}{}
if err := json.NewDecoder(bytes.NewBuffer(message)).Decode(&event); err != nil {
c.reportError(event, err)
return
}
func() {
index := v["index"].(string)
u, err := url.Parse(index)
if err != nil {
return
event_type := event["event_type"].(float64)
switch(event_type){
case 1:
c.search(event)
case 2:
c.discoverIndices(event)
case 3:
c.discoverFields(event)
}
es, err := elastic.NewClient(elastic.SetURL(u.Host), elastic.SetSniff(false))
if err != nil {
fmt.Println(err.Error())
c.send <- map[string]interface{}{
"error": map[string]interface{}{
"query": v["query"].(string),
"color": v["color"].(string),
"message": err.Error(),
},
}
return
}
hl := elastic.NewHighlight()
hl = hl.Fields(elastic.NewHighlighterField("_all").NumOfFragments(0))
hl = hl.PreTags("<em>").PostTags("</em>")
results, err := es.Search().
Index(path.Base(u.Path)). // search in index "twitter"
Highlight(hl).
Query(elastic.NewQueryStringQuery(v["query"].(string))). // specify the query
From(c.b).Size(200). // take documents 0-9
Do() // execute
if err != nil {
fmt.Println(err.Error())
c.send <- map[string]interface{}{
"error": map[string]interface{}{
"query": v["query"].(string),
"color": v["color"].(string),
"message": err.Error(),
},
}
return
}
c.send <- map[string]interface{}{
"hits": map[string]interface{}{
"query": v["query"].(string),
"color": v["color"].(string),
"results": results,
},
}
c.b += 10
}()
}
@@ -188,7 +222,6 @@ func (c *connection) writePump() {
}
}
// serveWs handles websocket requests from the peer.
func serveWs(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
View
18 main.go
@@ -8,23 +8,7 @@ import (
var addr = flag.String("addr", "0.0.0.0:8089", "http service address")
/*
func serveHome(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.Error(w, "Not found", 404)
return
}
if r.Method != "GET" {
http.Error(w, "Method not allowed", 405)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
homeTempl.Execute(w, r.Host)
}
*/
type Packet struct {
}
type Packet struct{}
func main() {
flag.Parse()

0 comments on commit 1213f80

Please sign in to comment.