forked from cloudflare/go-stream
-
Notifications
You must be signed in to change notification settings - Fork 3
/
json.go
69 lines (60 loc) · 1.66 KB
/
json.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package encoding
import (
"encoding/json"
"fmt"
"github.com/cevian/go-stream/stream"
"github.com/cevian/go-stream/stream/mapper"
"github.com/cevian/go-stream/util/slog"
//"reflect"
)
/* Example Decoder Usage
decFn := func (in []byte, decoder func([]byte, interface{}) ) stream.Object{
var i int
decoder(in, &i)
return i
}
intDecOp := encoding.NewJsonDecodeOp(decFn)
*/
func JsonGeneralDecoder() func([]byte, interface{}) {
fn := func(input []byte, to_populate interface{}) {
err := json.Unmarshal(input, to_populate)
if err != nil {
slog.Errorf("Error unmarshaling json: %v %v\n", err.Error(), string(input))
}
}
return fn
}
func JsonGeneralEncoder() func(interface{}) ([]byte, error) {
fn := func(input interface{}) ([]byte, error) {
return json.Marshal(input)
}
return fn
}
func NewJsonDecodeOp(decFn func([]byte, func([]byte, interface{})) stream.Object) stream.InOutOperator {
name := "JsonDecodeOp"
workerCreator := func() mapper.Worker {
decoder := JsonGeneralDecoder()
fn := func(obj stream.Object, out mapper.Outputer) error {
decoded := decFn(obj.([]byte), decoder)
out.Sending(1).Send(decoded)
return nil
}
return mapper.NewWorker(fn, name)
}
return mapper.NewClosureOp(workerCreator, nil, name)
}
func NewJsonEncodeOp() stream.Operator {
name := "JsonEncodeOp"
workerCreator := func() mapper.Worker {
fn := func(obj stream.Object, out mapper.Outputer) error {
res, err := json.Marshal(obj.([]byte))
if err != nil {
return fmt.Errorf("Error marshaling json %v\t%+v", err, obj)
}
out.Sending(1).Send(res)
return nil
}
return mapper.NewWorker(fn, name)
}
return mapper.NewClosureOp(workerCreator, nil, name)
}