/
json_reader.go
205 lines (173 loc) · 4.97 KB
/
json_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package array
import (
"errors"
"fmt"
"io"
"sync/atomic"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/internal/debug"
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/apache/arrow/go/v17/internal/json"
)
type Option func(config)
type config interface{}
// WithChunk sets the chunk size for reading in json records. The default is to
// read in one row per record batch as a single object. If chunk size is set to
// a negative value, then the entire file is read as a single record batch.
// Otherwise a record batch is read in with chunk size rows per record batch until
// it reaches EOF.
func WithChunk(n int) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *JSONReader:
cfg.chunk = n
default:
panic(fmt.Errorf("arrow/json): unknown config type %T", cfg))
}
}
}
// WithAllocator specifies the allocator to use for creating the record batches,
// if it is not called, then memory.DefaultAllocator will be used.
func WithAllocator(mem memory.Allocator) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *JSONReader:
cfg.mem = mem
default:
panic(fmt.Errorf("arrow/json): unknown config type %T", cfg))
}
}
}
// JSONReader is a json reader that meets the RecordReader interface definition.
//
// To read in an array of objects as a record, you can use RecordFromJSON
// which is equivalent to reading the json as a struct array whose fields are
// the columns of the record. This primarily exists to fit the RecordReader
// interface as a matching reader for the csv reader.
type JSONReader struct {
r *json.Decoder
schema *arrow.Schema
bldr *RecordBuilder
refs int64
cur arrow.Record
err error
chunk int
done bool
mem memory.Allocator
next func() bool
}
// NewJSONReader returns a json RecordReader which expects to find one json object
// per row of dataset. Using WithChunk can control how many rows are processed
// per record, which is how many objects become a single record from the file.
//
// If it is desired to write out an array of rows, then simply use RecordToStructArray
// and json.Marshal the struct array for the same effect.
func NewJSONReader(r io.Reader, schema *arrow.Schema, opts ...Option) *JSONReader {
rr := &JSONReader{
r: json.NewDecoder(r),
schema: schema,
refs: 1,
chunk: 1,
}
for _, o := range opts {
o(rr)
}
if rr.mem == nil {
rr.mem = memory.DefaultAllocator
}
rr.bldr = NewRecordBuilder(rr.mem, schema)
switch {
case rr.chunk < 0:
rr.next = rr.nextall
case rr.chunk > 1:
rr.next = rr.nextn
default:
rr.next = rr.next1
}
return rr
}
// Err returns the last encountered error
func (r *JSONReader) Err() error { return r.err }
func (r *JSONReader) Schema() *arrow.Schema { return r.schema }
// Record returns the last read in record. The returned record is only valid
// until the next call to Next unless Retain is called on the record itself.
func (r *JSONReader) Record() arrow.Record { return r.cur }
func (r *JSONReader) Retain() {
atomic.AddInt64(&r.refs, 1)
}
func (r *JSONReader) Release() {
debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases")
if atomic.AddInt64(&r.refs, -1) == 0 {
if r.cur != nil {
r.cur.Release()
r.bldr.Release()
r.r = nil
}
}
}
// Next returns true if it read in a record, which will be available via Record
// and false if there is either an error or the end of the reader.
func (r *JSONReader) Next() bool {
if r.cur != nil {
r.cur.Release()
r.cur = nil
}
if r.err != nil || r.done {
return false
}
return r.next()
}
func (r *JSONReader) readNext() bool {
r.err = r.r.Decode(r.bldr)
if r.err != nil {
r.done = true
if errors.Is(r.err, io.EOF) {
r.err = nil
}
return false
}
return true
}
func (r *JSONReader) nextall() bool {
for r.readNext() {
}
r.cur = r.bldr.NewRecord()
return r.cur.NumRows() > 0
}
func (r *JSONReader) next1() bool {
if !r.readNext() {
return false
}
r.cur = r.bldr.NewRecord()
return true
}
func (r *JSONReader) nextn() bool {
var n = 0
for i := 0; i < r.chunk && !r.done; i, n = i+1, n+1 {
if !r.readNext() {
break
}
}
if n > 0 {
r.cur = r.bldr.NewRecord()
}
return n > 0
}
var (
_ RecordReader = (*JSONReader)(nil)
)