diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4e8e42f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +#* +*~ diff --git a/bulk.go b/bulk.go new file mode 100644 index 0000000..719fdc4 --- /dev/null +++ b/bulk.go @@ -0,0 +1,135 @@ +package elasticsearch + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" +) + +// Abstract bulk update instruction. +type Instruction interface { + writeTo(w io.Writer) error +} + +// Instruction to update an index entry. +type UpdateInstruction struct { + Id string `json:"_id"` + Index string `json:"_index"` + Type string `json:"_type"` + Routing string `json:"_routing,omitempty"` + Body map[string]interface{} `json:"-"` +} + +func (ui *UpdateInstruction) writeTo(w io.Writer) error { + e := json.NewEncoder(w) + err := e.Encode(map[string]interface{}{ + "index": ui, + }) + if err != nil { + return err + } + err = e.Encode(ui.Body) + return err +} + +// Instruction to delete an item from an index. +type DeleteInstruction struct { + Id string `json:"_id"` + Index string `json:"_index"` + Type string `json:"_type"` + Routing string `json:"_routing,omitempty"` +} + +func (di *DeleteInstruction) writeTo(w io.Writer) error { + e := json.NewEncoder(w) + return e.Encode(map[string]interface{}{ + "delete": di, + }) +} + +type bulkWriter struct { + update chan Instruction + reqch chan chan *http.Request + quit chan bool + w *bytes.Buffer +} + +// Interface for writing bulk data into elasticsearch. +type BulkUpdater interface { + // Update the index with a new record (or delete a record). + Update(ui Instruction) + // Send the current batch. + SendBatch() error + // Shut down this bulk interface + Quit() +} + +func (b *bulkWriter) Update(ui Instruction) { + b.update <- ui +} + +func (b *bulkWriter) SendBatch() error { + reqch := make(chan *http.Request) + b.reqch <- reqch + req := <-reqch + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + // TODO: Parse the response and check each thingy. + if resp.StatusCode > 201 { + return errors.New("HTTP error: " + resp.Status) + } + return nil +} + +func (b *bulkWriter) Quit() { + b.quit <- true +} + +func issueBulkRequest(u string, bw *bulkWriter, reqch chan *http.Request) { + req, err := http.NewRequest("POST", u+"/_bulk", bw.w) + if err != nil { + log.Fatalf("Couldn't make a request: %v\n", err) + } + req.Header.Set("Content-Length", fmt.Sprintf("%d", bw.w.Len())) + req.Header.Set("Content-Type", "application/json") + reqch <- req + bw.w = &bytes.Buffer{} +} + +// Get a bulk updater. +func (e *ElasticSearch) Bulk() BulkUpdater { + + rv := &bulkWriter{ + update: make(chan Instruction), + reqch: make(chan chan *http.Request), + quit: make(chan bool), + w: &bytes.Buffer{}, + } + + go func() { + ever := true + for ever { + select { + case <-rv.quit: + ever = false + case req := <-rv.reqch: + issueBulkRequest(e.URL, rv, req) + case upd := <-rv.update: + err := upd.writeTo(rv.w) + if err != nil { + log.Fatalf("Error sending an update: %v", err) + } + } + } + }() + + return rv +} diff --git a/bytesource.go b/bytesource.go new file mode 100644 index 0000000..151dbeb --- /dev/null +++ b/bytesource.go @@ -0,0 +1,31 @@ +package elasticsearch + +import ( + "io" +) + +type bytesource struct { + ch <-chan []byte + current []byte +} + +func (b *bytesource) Read(out []byte) (int, error) { + if len(b.current) == 0 { + var ok bool + b.current, ok = <-b.ch + if !ok { + return 0, io.EOF + } + } + copied := copy(out, b.current) + b.current = b.current[copied:] + return copied, nil +} + +func (b *bytesource) Close() { + // Can't really do anything here. +} + +func NewByteSource(from <-chan []byte) *bytesource { + return &bytesource{ch: from} +} diff --git a/es.go b/es.go new file mode 100644 index 0000000..7cd889d --- /dev/null +++ b/es.go @@ -0,0 +1,109 @@ +// Go interface to elasticsearch. +package elasticsearch + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" +) + +// Reference to an ElasticSearch server. +type ElasticSearch struct { + // Base URL to elasticsearch + URL string +} + +type response struct { + OK bool `json:"ok"` + Index string `json:"_index"` + Type string `json:"_type"` + Id string `json:"_id"` + Found bool `json:"found"` + Source map[string]interface{} `json:"_source"` +} + +func (es *ElasticSearch) mkURL(index, doctype, id string, + params map[string]string) string { + + paramv := url.Values{} + for k, v := range params { + paramv.Set(k, v) + } + + return fmt.Sprintf("%s/%s/%s/%s?%s", es.URL, + url.QueryEscape(index), + url.QueryEscape(doctype), + url.QueryEscape(id), + paramv.Encode()) +} + +func handleResponse(resp *http.Response) (*response, error) { + defer resp.Body.Close() + if resp.StatusCode != 200 { + return nil, errors.New(resp.Status) + } + + d := json.NewDecoder(resp.Body) + iresp := &response{} + err := d.Decode(iresp) + if err != nil { + return nil, err + } + if !iresp.OK { + return nil, errors.New("Response wasn't OK") + } + + return iresp, nil +} + +// Store a document in the index. +// +// The ID is optional in which case the ID will be generated by the +// server. +// +// Returns the new ID on success, otherwise an error. +func (es *ElasticSearch) Index(index, doctype, id string, + doc map[string]interface{}, + params map[string]string) (string, error) { + + data, err := json.Marshal(doc) + if err != nil { + return "", err + } + + resp, err := http.DefaultClient.Post(es.mkURL(index, doctype, id, params), + "application/json", bytes.NewBuffer(data)) + if err != nil { + return "", err + } + iresp, err := handleResponse(resp) + if err != nil { + return "", err + } + + return iresp.Id, nil +} + +// Delete an index entry. +func (es *ElasticSearch) Delete(index, doctype, id string, + params map[string]string) (bool, error) { + + req, err := http.NewRequest("DELETE", es.mkURL(index, doctype, id, params), + nil) + if err != nil { + return false, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return false, err + } + iresp, err := handleResponse(resp) + if err != nil { + return false, err + } + + return iresp.Found, nil +}