/
jq.go
307 lines (266 loc) · 8.42 KB
/
jq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
// Package jq provides go bindings for libjq providing a streaming filter of
// JSON documents.
//
// This package provides a thin layer on top of stedolan's libjq -- it would
// likely be helpful to read through the wiki pages about it:
//
// jv: the JSON value type https://github.com/stedolan/jq/wiki/C-API:-jv
//
// libjq: https://github.com/stedolan/jq/wiki/C-API:-libjq
package jq
/*
To install
$ ./configure --disable-maintainer-mode --prefix=$PWD/BUILD
$ make install-libLTLIBRARIES install-includeHEADERS
*/
/*
#cgo LDFLAGS: -ljq
#cgo linux LDFLAGS: -lm
#include <jq.h>
#include <jv.h>
#include <stdlib.h>
void install_jq_error_cb(jq_state *jq, unsigned long long id);
*/
import "C"
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"unsafe"
)
// Jq encapsulates the state needed to interface with the libjq C library
type Jq struct {
_state *C.struct_jq_state
errorStoreId uint64
running sync.WaitGroup
}
// New initializes a new JQ object and the underlying C library.
func New() (*Jq, error) {
jq := new(Jq)
var err error
jq._state, err = C.jq_init()
if err != nil {
return nil, err
} else if jq == nil {
return nil, errors.New("jq_init returned nil -- out of memory?")
}
return jq, nil
}
// Close the handle to libjq and free C resources.
//
// If Start() has been called this will block until the input Channel it
// returns has been closed.
func (jq *Jq) Close() {
// If the goroutine from Start() is running we need to make sure it finished cleanly
// Wait until we aren't running before freeing C things.
//
jq.running.Wait()
if jq._state != nil {
C.jq_teardown(&jq._state)
jq._state = nil
}
if jq.errorStoreId != 0 {
globalErrorChannels.Delete(jq.errorStoreId)
jq.errorStoreId = 0
}
}
// We cant pass many things over the Go/C boundary, so instead of passing the error channel we pass an opaque indentifier (a 64bit int as it turns out) and use that to look up in a global variable
type errorLookupState struct {
sync.RWMutex
idCounter uint64
channels map[uint64]chan<- error
}
func (e *errorLookupState) Add(c chan<- error) uint64 {
newID := atomic.AddUint64(&e.idCounter, 1)
e.RWMutex.Lock()
defer e.RWMutex.Unlock()
e.channels[newID] = c
return newID
}
func (e *errorLookupState) Get(id uint64) chan<- error {
e.RWMutex.RLock()
defer e.RWMutex.RUnlock()
c, ok := e.channels[id]
if !ok {
panic(fmt.Sprintf("Tried to get error channel #%d out of store but it wasn't there!", id))
}
return c
}
func (e *errorLookupState) Delete(id uint64) {
e.RWMutex.Lock()
defer e.RWMutex.Unlock()
delete(e.channels, id)
}
// The global state - this also serves to keep the channel in scope by keeping
// a reference to it that the GC can see
var globalErrorChannels = errorLookupState{
channels: make(map[uint64]chan<- error),
}
//export goLibjqErrorHandler
func goLibjqErrorHandler(id uint64, jv C.jv) {
ch := globalErrorChannels.Get(id)
err := _ConvertError(jv)
ch <- err
}
// Start will compile `program` and return a three channels: input, output and
// error. Sending a jq.Jv* to input cause the program to be run to it and
// one-or-more results returned as jq.Jv* on the output channel, or one or more
// error values sent to the error channel. When you are done sending values
// close the input channel.
//
// args is a list of key/value pairs to bind as variables into the program, and
// must be an array type even if empty. Each element of the array should be an
// object with a "name" and "value" properties. Name should exclude the "$"
// sign. For example this is `[ {"name": "n", "value": 1 } ]` would then be
// `$n` in the programm.
//
// This function is not reentereant -- in that you cannot and should not call
// Start again until you have closed the previous input channel.
//
// If there is a problem compiling the JQ program then the errors will be
// reported on error channel before any input is read so makle sure you account
// for this case.
//
// Any jq.Jv* values passed to the input channel will be owned by the channel.
// If you want to keep them afterwards ensure you Copy() them before passing to
// the channel
func (jq *Jq) Start(program string, args *Jv) (in chan<- *Jv, out <-chan *Jv, errs <-chan error) {
// Create out two way copy of the channels. We need to be able to recv from
// input, so need to store the original channel
cIn := make(chan *Jv)
cOut := make(chan *Jv)
cErr := make(chan error)
// And assign the read/write only versions to the output fars
in = cIn
out = cOut
errs = cErr
// Before setting up any of the global error handling state, lets check that
// args is of the right type!
if args.Kind() != JV_KIND_ARRAY {
go func() {
// Take ownership of the inputs
for jv := range cIn {
jv.Free()
}
cErr <- fmt.Errorf("`args` parameter is of type %s not array!", args.Kind().String())
args.Free()
close(cOut)
close(cErr)
}()
return
}
if jq.errorStoreId != 0 {
// We might have called Compile
globalErrorChannels.Delete(jq.errorStoreId)
}
jq.errorStoreId = globalErrorChannels.Add(cErr)
// Because we can't pass a function pointer to an exported Go func we have to
// call a C function which uses the exported fund for us.
// https://github.com/golang/go/wiki/cgo#function-variables
C.install_jq_error_cb(jq._state, C.ulonglong(jq.errorStoreId))
jq.running.Add(1)
go func() {
if jq._Compile(program, args) == false {
// Even if compile failed follow the contract. Read any inputs and take
// ownership of them (aka free them)
//
// Errors from compile will be sent to the error channel
for jv := range cIn {
jv.Free()
}
} else {
for jv := range cIn {
results, err := jq.Execute(jv)
for _, result := range results {
cOut <- result
}
if err != nil {
cErr <- err
}
}
}
// Once we've read all the inputs close the output to signal to caller that
// we are done.
close(cOut)
close(cErr)
C.install_jq_error_cb(jq._state, 0)
jq.running.Done()
}()
return
}
// Execute will run the Compiled() program against a single input and return
// the results.
//
// Using this interface directly is not thread-safe -- it is up to the caller to
// ensure that this is not called from two goroutines concurrently.
func (jq *Jq) Execute(input *Jv) (results []*Jv, err error) {
flags := C.int(0)
results = make([]*Jv, 0)
C.jq_start(jq._state, input.jv, flags)
result := &Jv{C.jq_next(jq._state)}
for result.IsValid() {
results = append(results, result)
result = &Jv{C.jq_next(jq._state)}
}
msg, ok := result.GetInvalidMessageAsString()
if ok {
// Uncaught jq exception
// TODO: get file:line position in input somehow.
err = errors.New(msg)
}
return
}
// Compile the program and make it ready to Execute()
//
// Only a single program can be compiled on a Jq object at once. Calling this
// again a second time will replace the current program.
//
// args is a list of key/value pairs to bind as variables into the program, and
// must be an array type even if empty. Each element of the array should be an
// object with a "name" and "value" properties. Name should exclude the "$"
// sign. For example this is `[ {"name": "n", "value": 1 } ]` would then be
// `$n` in the program.
func (jq *Jq) Compile(prog string, args *Jv) (errs []error) {
// Before setting up any of the global error handling state, lets check that
// args is of the right type!
if args.Kind() != JV_KIND_ARRAY {
args.Free()
return []error{fmt.Errorf("`args` parameter is of type %s not array", args.Kind().String())}
}
cErr := make(chan error)
if jq.errorStoreId != 0 {
// We might have called Compile
globalErrorChannels.Delete(jq.errorStoreId)
}
jq.errorStoreId = globalErrorChannels.Add(cErr)
C.install_jq_error_cb(jq._state, C.ulonglong(jq.errorStoreId))
defer C.install_jq_error_cb(jq._state, 0)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for err := range cErr {
if err == nil {
break
}
errs = append(errs, err)
}
wg.Done()
}()
compiled := jq._Compile(prog, args)
cErr <- nil // Sentinel to break the loop above
wg.Wait()
globalErrorChannels.Delete(jq.errorStoreId)
jq.errorStoreId = 0
if !compiled && len(errs) == 0 {
return []error{fmt.Errorf("jq_compile returned error, but no errors were reported. Oops")}
}
return errs
}
func (jq *Jq) _Compile(prog string, args *Jv) bool {
cs := C.CString(prog)
defer C.free(unsafe.Pointer(cs))
// If there was an error it will have been sent to errorChannel via the
// installed error handler
return C.jq_compile_args(jq._state, cs, args.jv) != 0
}