Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Elasticsearch basic update stuff.
  • Loading branch information
dustin committed Mar 26, 2012
0 parents commit db83d8a
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
@@ -0,0 +1,2 @@
#*
*~
135 changes: 135 additions & 0 deletions bulk.go
@@ -0,0 +1,135 @@
package elasticsearch

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
)

// Abstract bulk update instruction.
type Instruction interface {
writeTo(w io.Writer) error
}

// Instruction to update an index entry.
type UpdateInstruction struct {
Id string `json:"_id"`
Index string `json:"_index"`
Type string `json:"_type"`
Routing string `json:"_routing,omitempty"`
Body map[string]interface{} `json:"-"`
}

func (ui *UpdateInstruction) writeTo(w io.Writer) error {
e := json.NewEncoder(w)
err := e.Encode(map[string]interface{}{
"index": ui,
})
if err != nil {
return err
}
err = e.Encode(ui.Body)
return err
}

// Instruction to delete an item from an index.
type DeleteInstruction struct {
Id string `json:"_id"`
Index string `json:"_index"`
Type string `json:"_type"`
Routing string `json:"_routing,omitempty"`
}

func (di *DeleteInstruction) writeTo(w io.Writer) error {
e := json.NewEncoder(w)
return e.Encode(map[string]interface{}{
"delete": di,
})
}

type bulkWriter struct {
update chan Instruction
reqch chan chan *http.Request
quit chan bool
w *bytes.Buffer
}

// Interface for writing bulk data into elasticsearch.
type BulkUpdater interface {
// Update the index with a new record (or delete a record).
Update(ui Instruction)
// Send the current batch.
SendBatch() error
// Shut down this bulk interface
Quit()
}

func (b *bulkWriter) Update(ui Instruction) {
b.update <- ui
}

func (b *bulkWriter) SendBatch() error {
reqch := make(chan *http.Request)
b.reqch <- reqch
req := <-reqch

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// TODO: Parse the response and check each thingy.
if resp.StatusCode > 201 {
return errors.New("HTTP error: " + resp.Status)
}
return nil
}

func (b *bulkWriter) Quit() {
b.quit <- true
}

func issueBulkRequest(u string, bw *bulkWriter, reqch chan *http.Request) {
req, err := http.NewRequest("POST", u+"/_bulk", bw.w)
if err != nil {
log.Fatalf("Couldn't make a request: %v\n", err)
}
req.Header.Set("Content-Length", fmt.Sprintf("%d", bw.w.Len()))
req.Header.Set("Content-Type", "application/json")
reqch <- req
bw.w = &bytes.Buffer{}
}

// Get a bulk updater.
func (e *ElasticSearch) Bulk() BulkUpdater {

rv := &bulkWriter{
update: make(chan Instruction),
reqch: make(chan chan *http.Request),
quit: make(chan bool),
w: &bytes.Buffer{},
}

go func() {
ever := true
for ever {
select {
case <-rv.quit:
ever = false
case req := <-rv.reqch:
issueBulkRequest(e.URL, rv, req)
case upd := <-rv.update:
err := upd.writeTo(rv.w)
if err != nil {
log.Fatalf("Error sending an update: %v", err)
}
}
}
}()

return rv
}
31 changes: 31 additions & 0 deletions bytesource.go
@@ -0,0 +1,31 @@
package elasticsearch

import (
"io"
)

type bytesource struct {
ch <-chan []byte
current []byte
}

func (b *bytesource) Read(out []byte) (int, error) {
if len(b.current) == 0 {
var ok bool
b.current, ok = <-b.ch
if !ok {
return 0, io.EOF
}
}
copied := copy(out, b.current)
b.current = b.current[copied:]
return copied, nil
}

func (b *bytesource) Close() {
// Can't really do anything here.
}

func NewByteSource(from <-chan []byte) *bytesource {
return &bytesource{ch: from}
}
109 changes: 109 additions & 0 deletions es.go
@@ -0,0 +1,109 @@
// Go interface to elasticsearch.
package elasticsearch

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
)

// Reference to an ElasticSearch server.
type ElasticSearch struct {
// Base URL to elasticsearch
URL string
}

type response struct {
OK bool `json:"ok"`
Index string `json:"_index"`
Type string `json:"_type"`
Id string `json:"_id"`
Found bool `json:"found"`
Source map[string]interface{} `json:"_source"`
}

func (es *ElasticSearch) mkURL(index, doctype, id string,
params map[string]string) string {

paramv := url.Values{}
for k, v := range params {
paramv.Set(k, v)
}

return fmt.Sprintf("%s/%s/%s/%s?%s", es.URL,
url.QueryEscape(index),
url.QueryEscape(doctype),
url.QueryEscape(id),
paramv.Encode())
}

func handleResponse(resp *http.Response) (*response, error) {
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, errors.New(resp.Status)
}

d := json.NewDecoder(resp.Body)
iresp := &response{}
err := d.Decode(iresp)
if err != nil {
return nil, err
}
if !iresp.OK {
return nil, errors.New("Response wasn't OK")
}

return iresp, nil
}

// Store a document in the index.
//
// The ID is optional in which case the ID will be generated by the
// server.
//
// Returns the new ID on success, otherwise an error.
func (es *ElasticSearch) Index(index, doctype, id string,
doc map[string]interface{},
params map[string]string) (string, error) {

data, err := json.Marshal(doc)
if err != nil {
return "", err
}

resp, err := http.DefaultClient.Post(es.mkURL(index, doctype, id, params),
"application/json", bytes.NewBuffer(data))
if err != nil {
return "", err
}
iresp, err := handleResponse(resp)
if err != nil {
return "", err
}

return iresp.Id, nil
}

// Delete an index entry.
func (es *ElasticSearch) Delete(index, doctype, id string,
params map[string]string) (bool, error) {

req, err := http.NewRequest("DELETE", es.mkURL(index, doctype, id, params),
nil)
if err != nil {
return false, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, err
}
iresp, err := handleResponse(resp)
if err != nil {
return false, err
}

return iresp.Found, nil
}

0 comments on commit db83d8a

Please sign in to comment.