forked from go-kivik/couchdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bulk.go
104 lines (97 loc) · 2.44 KB
/
bulk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package couchdb
import (
"context"
"encoding/json"
"fmt"
"io"
"github.com/flimzy/kivik"
"github.com/flimzy/kivik/driver"
"github.com/flimzy/kivik/errors"
"github.com/go-kivik/couchdb/chttp"
)
type bulkResults struct {
body io.ReadCloser
dec *json.Decoder
}
var _ driver.BulkResults = &bulkResults{}
func (r *bulkResults) Next(update *driver.BulkResult) error {
if !r.dec.More() {
if err := consumeDelim(r.dec, json.Delim(']')); err != nil {
return err
}
return io.EOF
}
var updateResult struct {
ID string `json:"id"`
Rev string `json:"rev"`
Error string `json:"error"`
Reason string `json:"reason"`
}
err := r.dec.Decode(&updateResult)
if err != nil {
return err
}
update.ID = updateResult.ID
update.Rev = updateResult.Rev
update.Error = nil
if updateResult.Error != "" {
var status int
switch updateResult.Error {
case "conflict":
status = kivik.StatusConflict
case "not_implemented":
status = kivik.StatusNotImplemented
default:
fmt.Printf("Unknown error %s / %s \n", updateResult.Error, updateResult.Reason)
}
update.Error = errors.Status(status, updateResult.Reason)
}
return nil
}
func (r *bulkResults) Close() error {
return r.body.Close()
}
func (d *db) BulkDocs(ctx context.Context, docs []interface{}) (driver.BulkResults, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
body, errFunc := chttp.EncodeBody(map[string]interface{}{"docs": docs}, cancel)
opts := &chttp.Options{
Body: body,
ForceCommit: d.forceCommit,
}
resp, err := d.Client.DoReq(ctx, kivik.MethodPost, d.path("_bulk_docs", nil), opts)
if jsonErr := errFunc(); jsonErr != nil {
if resp.Body != nil {
_ = resp.Body.Close()
}
return nil, jsonErr
}
if err != nil {
return nil, err
}
switch resp.StatusCode {
case kivik.StatusCreated:
// Nothing to do
case kivik.StatusExpectationFailed:
err = &chttp.HTTPError{
Code: kivik.StatusExpectationFailed,
Reason: "one or more document was rejected",
}
default:
if resp.StatusCode < 400 {
fmt.Printf("Unexpected BulkDoc response code: %d", resp.StatusCode)
}
// All other errors can consume the response body and return immediately
return nil, chttp.ResponseError(resp)
}
dec := json.NewDecoder(resp.Body)
// Consume the opening '[' char
if jsonErr := consumeDelim(dec, json.Delim('[')); jsonErr != nil {
return nil, jsonErr
}
return &bulkResults{
body: resp.Body,
dec: dec,
}, err
}