-
Notifications
You must be signed in to change notification settings - Fork 330
/
decoder.go
156 lines (125 loc) · 2.76 KB
/
decoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package json
import (
"context"
"encoding/json"
"io"
"io/ioutil"
"os"
"strings"
"github.com/gabriel-vasile/mimetype"
)
type (
decoder struct {
ident string
src *os.File
reader *json.Decoder
header []string
count uint64
}
)
func CanDecodeFile(f io.Reader) bool {
m, err := mimetype.DetectReader(f)
if err != nil {
return false
}
return CanDecodeExt(m.Extension())
}
func CanDecodeMime(m string) bool {
return m == "application/json" || m == "application/jsonlines"
}
func CanDecodeExt(ext string) bool {
pt := strings.Split(ext, ".")
ext = strings.TrimSpace(pt[len(pt)-1])
return ext == "jsonl" || ext == "json" || ext == "ndjson"
}
// Decoder inits a new csv decoder from the given reader
//
// @todo hold small files in mem to avoid needles disc access
func Decoder(r io.Reader, ident string) (out *decoder, err error) {
out = &decoder{
ident: ident,
}
out.src, err = ioutil.TempFile(os.TempDir(), "*.ndjson")
if err != nil {
return
}
err = out.flushTemp(r)
defer out.src.Seek(0, 0)
if err != nil {
return
}
out.reader = json.NewDecoder(out.src)
seenHeader := make(map[string]bool)
var aux map[string]string
for out.reader.More() {
err = out.reader.Decode(&aux)
if err == io.EOF {
return out, nil
} else if err != nil {
return
}
for f := range aux {
if seenHeader[f] {
continue
}
seenHeader[f] = true
out.header = append(out.header, f)
}
// Entry count
out.count++
}
return
}
// Cleanup should be called before we stop using the decoder
func (d *decoder) Cleanup() error {
return os.Remove(d.src.Name())
}
// SetIdent overwrites the system defined identifier
func (d *decoder) SetIdent(ident string) {
d.ident = ident
}
// Ident returns the assigned identifier
func (d *decoder) Ident() string {
return d.ident
}
// Fields returns every available field in this dataset
func (d *decoder) Fields() []string {
return d.header
}
// Reset resets the decoder to the start
func (d *decoder) Reset(_ context.Context) error {
_, err := d.src.Seek(0, 0)
if err != nil {
return err
}
d.reader = json.NewDecoder(d.src)
return nil
}
// Next returns the field: value mapping for the next row
func (d *decoder) Next(_ context.Context, out map[string]string) (more bool, err error) {
err = d.reader.Decode(&out)
if err == io.EOF {
return false, nil
} else if err != nil {
return false, err
}
// Empty out missing fields to keep consistent with CSV
for _, h := range d.header {
if _, ok := out[h]; !ok {
out[h] = ""
}
}
return true, nil
}
// Count returns the total number of rows in the dataset
func (d *decoder) Count() uint64 {
return d.count
}
func (d *decoder) flushTemp(r io.Reader) (err error) {
_, err = io.Copy(d.src, r)
if err != nil {
return
}
d.src.Seek(0, 0)
return nil
}