forked from linkedin/goavro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
176 lines (149 loc) · 4.12 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package main
import (
"bufio"
bin "encoding/binary"
hex "encoding/hex"
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"github.com/crxfoz/goavro/v2"
)
// roundtrip is a tool for checking avro
//
// incoming data is assumed to be standard json
// incoming json is required to be one json object per line
// use `jq -c .` if you need to. get it into one line
//
// you can write out your avro in binary form and stop there
// which is useful for cases where you might want to send it off into other tools
//
// you can also do a roundtrip of decode/encode
// which allows you to see if your avro schema matches your expectations
//
// If you want to use an encoded schemaid then specify a schemid with -sid
// it will be encoded per a common standard (one null byte, 16 bytes of schemaid)
// Its NOT the standard SOE
// SOE should be added
// Probably OCF should be added too
//
// EXAMPLE
//
// kubectl get events -w -o json | jq -c . | ./roundtrip -sid aa6b1ca0e1ee2d885bfbc747f4a4011b -avsc event-schema.json ) -rt
func MakeAvroHeader(schemaid string) (header []byte, err error) {
dst, err := hex.DecodeString(schemaid)
if err != nil {
return
}
header = append(header, byte(0))
header = append(header, dst...)
return
}
func main() {
var avsc = flag.String("avsc", "", "the avro schema")
var data = flag.String("data", "-", "(default stdin) the data that corresponds to the avro schema or error - ONE LINE PER DATA ITEM")
var schemaid = flag.String("sid", "", "the schemaid which is normally the md5hash of rht schema itself")
var roundtrip = flag.Bool("rt", false, "do full round trip to try to rebuild the original data string")
var xxd = flag.String("bin", "", "write out the binary data to this file - look at it with xxd if you want to")
var appendBin = flag.Bool("append", false, "append to the output binary file instead of trunc")
flag.Parse()
_avsc, err := ioutil.ReadFile(*avsc)
if err != nil {
panic(fmt.Sprintf("Failed to read avsc file:%s:error:%v:", *avsc, err))
}
codec, err := goavro.NewCodecForStandardJSON(string(_avsc))
if err != nil {
panic(err)
}
var _data io.Reader
if *data == "-" {
_data = os.Stdin
} else {
file, err := os.Open(*data)
if err != nil {
panic(fmt.Sprintf("Failed to open data file:%s:error:%v:", *data, err))
}
_data = bufio.NewReader(file)
defer file.Close()
}
binOut := struct {
file *os.File
do bool
}{}
if len(*xxd) > 0 {
bits := os.O_WRONLY | os.O_CREATE
if *appendBin {
bits |= os.O_APPEND
} else {
bits |= os.O_TRUNC
}
binOut.file, err = os.OpenFile(*xxd, bits, 0600)
if err != nil {
panic(err)
}
defer binOut.file.Close()
binOut.do = true
}
scanner := bufio.NewScanner(_data)
for scanner.Scan() {
dat := scanner.Text()
if len(dat) == 0 {
fmt.Println("skipping empty line")
continue
}
fmt.Println("RT in")
fmt.Println(dat)
textual := []byte(dat)
fmt.Printf("encoding for schemaid:%s:\n", *schemaid)
avroNative, _, err := codec.NativeFromTextual(textual)
if err != nil {
fmt.Println(dat)
panic(err)
}
header, err := MakeAvroHeader(*schemaid)
if err != nil {
fmt.Println(string(textual))
panic(err)
}
avrobin, err := codec.BinaryFromNative(nil, avroNative)
if err != nil {
fmt.Println(dat)
panic(err)
}
// trying to minimize operations within the loop
// so do only a quick boolean check here
if binOut.do {
for _, buf := range [][]byte{header, avrobin} {
err = bin.Write(binOut.file, bin.LittleEndian, buf)
if err != nil {
fmt.Println(dat)
panic(err)
}
}
}
if *roundtrip {
// this will scramble the order
// since it makes new go maps
// when it takes the binary into native
rtnativeval, _, err := codec.NativeFromBinary(avrobin)
if err != nil {
fmt.Println(dat)
panic(err)
}
// Convert native Go form to textual Avro data
textual, err = codec.TextualFromNative(nil, rtnativeval)
if err != nil {
fmt.Println(dat)
panic(err)
}
fmt.Println("RT out")
fmt.Println(string(textual))
}
}
if err := scanner.Err(); err != nil {
fmt.Println("scanner error")
panic(err)
}
fmt.Println("Done with loop - no more data")
}