-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathbulk.go
155 lines (136 loc) · 3.87 KB
/
bulk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package kivik
import (
"context"
"errors"
"io"
"net/http"
"github.com/go-kivik/kivik/v3/driver"
)
// BulkResults is an iterator over the results of a BulkDocs query.
type BulkResults struct {
*iter
bulki driver.BulkResults
}
// Next returns the next BulkResult from the feed. If an error occurs, it will
// be returned and the feed closed. io.EOF will be returned when there are no
// more results.
func (r *BulkResults) Next() bool {
return r.iter.Next()
}
// Err returns the error, if any, that was encountered during iteration. Err
// may be called after an explicit or implicit Close.
func (r *BulkResults) Err() error {
return r.iter.Err()
}
// Close closes the feed. Any unread updates will still be accessible via
// Next().
func (r *BulkResults) Close() error {
return r.iter.Close()
}
type bulkIterator struct{ driver.BulkResults }
var _ iterator = &bulkIterator{}
func (r *bulkIterator) Next(i interface{}) error {
return r.BulkResults.Next(i.(*driver.BulkResult))
}
func newBulkResults(ctx context.Context, bulki driver.BulkResults) *BulkResults {
return &BulkResults{
iter: newIterator(ctx, &bulkIterator{bulki}, &driver.BulkResult{}),
bulki: bulki,
}
}
// ID returns the document ID name for the current result.
func (r *BulkResults) ID() string {
runlock, err := r.rlock()
if err != nil {
return ""
}
defer runlock()
return r.curVal.(*driver.BulkResult).ID
}
// Rev returns the revision of the current curResult.
func (r *BulkResults) Rev() string {
runlock, err := r.rlock()
if err != nil {
return ""
}
defer runlock()
return r.curVal.(*driver.BulkResult).Rev
}
// UpdateErr returns the error associated with the current result, or nil
// if none. Do not confuse this with Err, which returns an error for the
// iterator itself.
func (r *BulkResults) UpdateErr() error {
runlock, err := r.rlock()
if err != nil {
return nil
}
defer runlock()
return r.curVal.(*driver.BulkResult).Error
}
// BulkDocs allows you to create and update multiple documents at the same time
// within a single request. This function returns an iterator over the results
// of the bulk operation.
// See http://docs.couchdb.org/en/2.0.0/api/database/bulk-api.html#db-bulk-docs
//
// As with Put, each individual document may be a JSON-marshable object, or a
// raw JSON string in a []byte, json.RawMessage, or io.Reader.
func (db *DB) BulkDocs(ctx context.Context, docs []interface{}, options ...Options) (*BulkResults, error) {
docsi, err := docsInterfaceSlice(docs)
if err != nil {
return nil, err
}
if len(docsi) == 0 {
return nil, &Error{HTTPStatus: http.StatusBadRequest, Err: errors.New("kivik: no documents provided")}
}
opts := mergeOptions(options...)
if bulkDocer, ok := db.driverDB.(driver.BulkDocer); ok {
bulki, err := bulkDocer.BulkDocs(ctx, docsi, opts)
if err != nil {
return nil, err
}
return newBulkResults(ctx, bulki), nil
}
var results []driver.BulkResult
for _, doc := range docsi {
var err error
var id, rev string
if docID, ok := extractDocID(doc); ok {
id = docID
rev, err = db.Put(ctx, id, doc, opts)
} else {
id, rev, err = db.CreateDoc(ctx, doc, opts)
}
results = append(results, driver.BulkResult{
ID: id,
Rev: rev,
Error: err,
})
}
return newBulkResults(ctx, &emulatedBulkResults{results}), nil
}
type emulatedBulkResults struct {
results []driver.BulkResult
}
var _ driver.BulkResults = &emulatedBulkResults{}
func (r *emulatedBulkResults) Close() error {
r.results = nil
return nil
}
func (r *emulatedBulkResults) Next(res *driver.BulkResult) error {
if len(r.results) == 0 {
return io.EOF
}
*res = r.results[0]
r.results = r.results[1:]
return nil
}
func docsInterfaceSlice(docsi []interface{}) ([]interface{}, error) {
for i, doc := range docsi {
x, err := normalizeFromJSON(doc)
if err != nil {
return nil, &Error{HTTPStatus: http.StatusBadRequest, Err: err}
}
docsi[i] = x
}
return docsi, nil
}