forked from threatgrid/jqpipe-go
/
jqpipe.go
137 lines (118 loc) · 3.49 KB
/
jqpipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
Wraps the "jq" utility as a pipe.
This package makes it easy for Go programs to filter JSON data using
stedolan's "jq". This is used internally at ThreatGRID as a sort of
expedient map/reduce in its distributed data store and in its "expectjq"
test utility.
*/
package jq
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"os/exec"
)
// Eval starts a new Jq process to evaluate an expression with json input
func Eval(js string, expr string, opts ...string) ([]json.RawMessage, error) {
jq, err := New(bytes.NewReader([]byte(js)), expr, opts...)
if err != nil {
return nil, err
}
ret := make([]json.RawMessage, 0, 16)
for {
next, err := jq.Next()
switch err {
case nil:
ret = append(ret, next)
case io.EOF:
return ret, nil
default:
return ret, err
}
}
panic("unreachable") // for go 1.0
}
// New wraps a jq.Pipe around an existing io.Reader, applying a JQ expression
func New(r io.Reader, expr string, opts ...string) (*Pipe, error) {
var err error
proc := new(Pipe)
opts = append(opts, expr)
proc.jq = exec.Command("jq", opts...)
proc.jq.Stdin = r
proc.stdout, err = proc.jq.StdoutPipe()
if err != nil {
return nil, err
}
proc.jq.Stderr = &proc.stderr
err = proc.jq.Start()
if err != nil {
proc.stdout.Close()
return nil, err
}
proc.dec = json.NewDecoder(proc.stdout)
return proc, nil
}
// Pipe encapsulates a child "jq" process with a fixed expression, returning each JSON output from jq.
type Pipe struct {
jq *exec.Cmd
dec *json.Decoder
stdout io.ReadCloser
stderr bytes.Buffer
}
// Next provides the next JSON result from JQ. If there are no more results, io.EOF is returned.
func (p *Pipe) Next() (json.RawMessage, error) {
var msg json.RawMessage
err := p.dec.Decode(&msg)
// Guard against a Next() after we have terminated.
if p.jq.ProcessState.Exited() {
fmt.Println("jq process exited")
return nil, io.EOF
}
fmt.Println("jq process status after decoding message", p.jq.ProcessState.String())
if err == nil {
return msg, nil
}
p.stdout.Close()
// if we have a decoding error, jq is sick and we need to kill it with fire..
if err != io.EOF {
p.Close()
return nil, err
}
fmt.Println("jq process status before being killed", p.jq.ProcessState.String())
// terminate jq (if it hasn't died already)
killErr := p.jq.Process.Kill()
fmt.Println("jq process killed, error was [", killErr, "]")
waitErr := p.jq.Wait()
fmt.Println("done waiting for jq process, error was [", waitErr, "]")
fmt.Println("jq process status after being killed", p.jq.ProcessState.String())
// if jq complained, that's our error
if p.stderr.Len() != 0 {
return nil, errors.New(p.stderr.String())
}
fmt.Println("jq process status before checking if it is success", p.jq.ProcessState.String())
fmt.Println("jq process exit code", p.jq.ProcessState.ExitCode())
if p.jq.ProcessState.Success() || p.jq.ProcessState.Exited() || p.jq.ProcessState.String() == "signal: killed" {
return nil, io.EOF
}
fmt.Println("jq process status before unexplained error", p.jq.ProcessState.Pid(), p.jq.ProcessState.String())
return nil, errors.New("unexplained jq failure")
}
// Close attempts to halt the jq process if it has not already exited. This is only necessary if Next has not returned io.EOF.
func (p *Pipe) Close() error {
if p.stdout != nil {
p.stdout.Close()
}
if p.jq == nil {
return nil
}
if p.jq.ProcessState != nil && p.jq.ProcessState.Exited() {
return nil
}
if p.jq.Process != nil {
p.jq.Process.Kill()
go p.jq.Process.Wait()
}
return nil
}