forked from cloudflare/go-stream
-
Notifications
You must be signed in to change notification settings - Fork 3
/
gob.go
146 lines (131 loc) · 3.94 KB
/
gob.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package encoding
import (
"bytes"
"encoding/gob"
"fmt"
"io"
"log"
"github.com/cevian/go-stream/stream"
"github.com/cevian/go-stream/stream/mapper"
//"reflect"
)
/* Example Decoder Usage
//intDecGenFn := func (obj stream.Object, out mapper.Outputter) {
// decoder := encoding.GobGeneralDecoder()
// return func(obj stream.Object, out mapper.Outputter) {
// var i int
// decoder(obj.([]byte), &i)
// out.Out(1) <- i
// }
//}
decFn := func (in []byte, decoder func([]byte, interface{}) ) stream.Object{
var i int
decoder(in, &i)
return i
}
intDecOp := encoding.NewGobDecodeOp(decFn)
*/
func GobGeneralDecoder() func([]byte, interface{}) {
/* Look at the notes for encoder since they are relevant here.
Basically this uses a model where each recieved input corresponds
to a separate gob stream */
br := new(ByteReader)
fn := func(input []byte, to_populate interface{}) {
dec := gob.NewDecoder(br) //each input is an indy stream
br.s = input
br.i = 0
err := dec.Decode(to_populate)
if err != nil {
log.Printf("Error unmarshaling gob: %v\n", err.Error())
}
}
return fn
}
func NewGobDecodeOp(
decFn func([]byte, func([]byte, interface{})) stream.Object) stream.InOutOperator {
name := "GobDecodeOp"
closure := func() mapper.Worker {
decoder := GobGeneralDecoder()
fn := func(obj stream.Object, out mapper.Outputer) error {
decoded := decFn(obj.([]byte), decoder)
out.Sending(1).Send(decoded)
return nil
}
return mapper.NewWorker(fn, name)
}
return mapper.NewClosureOp(closure, nil, name)
}
func NewGobEncodeOp() stream.InOutOperator {
/* Each encoder provides a stateful stream. So we have to choices:
Either run this operator not in parallel and get a stateful stream
Or run this in parallel but use a new encoder for each input. We
choose the latter but plan to buffer upstream so we get big streams
coming out. We will compress each output separately here.
*/
name := "GobEncodeOp"
workerCreator := func() mapper.Worker {
var buf bytes.Buffer
fn := func(obj stream.Object, outputter mapper.Outputer) error {
enc := gob.NewEncoder(&buf) //each output is an indy stream
err := enc.Encode(obj)
if err != nil {
return fmt.Errorf("Error marshaling gob: %v\n", err.Error())
}
n := buf.Len()
out := make([]byte, n)
if out == nil {
log.Printf("Make failed")
}
newn, err := buf.Read(out)
if newn != n || err != nil {
if err == nil {
return fmt.Errorf("Error marshaling gob on read: %v\t%v\n", newn, n)
} else {
return fmt.Errorf("Error marshaling gob on read: %v\t%v\t%v\n", newn, n, err.Error())
}
}
outputter.Sending(1).Send(out)
return nil
}
return mapper.NewWorker(fn, name)
}
op := mapper.NewClosureOp(workerCreator, nil, name)
//op.Parallel = false
return op
}
/////////////////////////////////////////////////// Misc ///////////////////////////////////////////////
//stolen from bytes.Reader. Slightly changed to allow us to reuse gob.Decoder on same object
type ByteReader struct {
s []byte
i int // current reading index
}
func (r *ByteReader) Read(b []byte) (n int, err error) {
if len(b) == 0 {
return 0, nil
}
if r.i >= len(r.s) {
return 0, io.EOF
}
n = copy(b, r.s[r.i:])
r.i += n
return
}
///////////////////////////////////////////OLD STUFF///////////////////////////////////////////////////////////
/*
Works but we dont want to support transform functions that output []interface{}
func NewGobDecodeRopUnsafe(inch chan []byte, outch interface{}, typ reflect.Type) stream.Operator { //the chain constructor should decide on the format of outch
br := ByteReader{}
dec := gob.NewDecoder(&br)
fn := func(in []byte, closenotifier chan<- bool) []interface{} {
br.s = in
br.i = 0
obj := reflect.New(typ)
err := dec.Decode(obj.Interface())
if err != nil {
log.Printf("Error unmarshaling gob: %v\n", err.Error())
}
return []interface{}{obj.Elem().Interface()}
}
return stream.NewReflectOp(fn, inch, outch, "GobDecodeRop")
}
*/